diff --git a/api/defined/v1/storage/config.go b/api/defined/v1/storage/config.go new file mode 100644 index 0000000..c808ba3 --- /dev/null +++ b/api/defined/v1/storage/config.go @@ -0,0 +1,33 @@ +package storage + +import "time" + +type ( + PromoteConfig struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 1m + } + DemoteConfig struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 1m + Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N% + } + MigrationConfig struct { + Enabled bool `json:"enabled" yaml:"enabled"` + Promote PromoteConfig `json:"promote" yaml:"promote"` // 升温 + Demote DemoteConfig `json:"demote" yaml:"demote"` // 降温 + } + + BucketConfig struct { + Path string `json:"path" yaml:"path"` // local path or ? + Driver string `json:"driver" yaml:"driver"` // native, custom-driver + Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory + DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble + DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: /.indexdb + AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async + SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part + MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard + Migration *MigrationConfig `json:"migration" yaml:"migration"` // migration config + DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config + } +) diff --git a/api/defined/v1/storage/indexdb.go b/api/defined/v1/storage/indexdb.go index 4f00827..8416521 100644 --- a/api/defined/v1/storage/indexdb.go +++ b/api/defined/v1/storage/indexdb.go @@ -13,7 +13,9 @@ var ErrKeyNotFound = errors.New("key not found") const ( TypeInMemory = "inmemory" - TypeNormal = "normal" + TypeNormal = "normal" // normal, warm 同一个 + TypeWarm = "warm" // normal, warm 同一个 + TypeCold = "cold" TypeHot = "hot" ) diff --git a/api/defined/v1/storage/storage.go b/api/defined/v1/storage/storage.go index 51b7ce2..b407e0b 100644 --- a/api/defined/v1/storage/storage.go +++ b/api/defined/v1/storage/storage.go @@ -20,6 +20,8 @@ type Selector interface { type Operation interface { // Lookup retrieves the metadata for the specified object ID. Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error) + // Touch updates the last access time and refs count of the object. + Touch(ctx context.Context, id *object.ID) // Store store the metadata for the specified object ID. Store(ctx context.Context, meta *object.Metadata) error // Exist checks if the object exists. @@ -42,6 +44,12 @@ type Operation interface { WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error) // ReadChunkFile open chunk file and returns io.ReadCloser ReadChunkFile(ctx context.Context, id *object.ID, index uint32) (File, string, error) + // Migrate migrates the object from this bucket to the destination bucket. + // It should copy the object data and metadata to the destination, and then + // remove it from the source bucket upon success. + Migrate(ctx context.Context, id *object.ID, dest Bucket) error + // SetMigration sets the migration for the bucket. + SetMigration(m Migration) error } type Storage interface { @@ -55,6 +63,20 @@ type Storage interface { PURGE(storeUrl string, typ PurgeControl) error } +type Migration interface { + // Demote demotes the object from the source bucket to the destination bucket. + Demote(ctx context.Context, id *object.ID, src Bucket) error + // Promote promotes the object from the source bucket to the destination bucket. + Promote(ctx context.Context, id *object.ID, src Bucket) error +} + +type Migrator interface { + Storage + + // SelectLayer selects the Bucket by the object ID and layer. + SelectLayer(ctx context.Context, id *object.ID, layer string) Bucket +} + type Bucket interface { io.Closer Operation @@ -131,7 +153,7 @@ const ( RefsMask = (1 << CounterBits) - 1 ) -func NewMark(clock int64, refs uint64) Mark { +func NewMark(clock, refs int64) Mark { return Mark(clock)<= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m + } + Demote struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m + Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N% + } + Migration struct { + Enabled bool `json:"enabled" yaml:"enabled"` + Promote Promote `json:"promote" yaml:"promote"` // 升温 + Demote Demote `json:"demote" yaml:"demote"` // 降温 + } +) type Plugin struct { Name string `json:"name" yaml:"name"` Options map[string]any `json:"options" yaml:"options"` diff --git a/config.example.yaml b/config.example.yaml index 2c84f82..80a1cf6 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -93,6 +93,15 @@ storage: eviction_policy: fifo # fifo, lru, lfu selection_policy: hashring # hashring, roundrobin slice_size: 1048576 # 1MB + migration: + enabled: false # enable tiering bucket + promote: + min_hits: 10 # window hits to promote + window: 1m # 1 minute window + demote: + min_hits: 2 # window hits to demote + window: 5m # 5 minutes window + occupancy: 75 # percent useage diraware: enabled: true # 默认 true store_path: /cache1/.diraware diff --git a/go.mod b/go.mod index 9de3d72..d0d9d19 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.2 require ( dario.cat/mergo v1.0.2 github.com/andybalholm/brotli v1.2.0 + github.com/cespare/xxhash/v2 v2.3.0 github.com/cloudflare/tableflip v1.2.3 github.com/cockroachdb/pebble/v2 v2.1.2 github.com/fsnotify/fsnotify v1.9.0 @@ -34,7 +35,6 @@ require ( github.com/antlabs/timer v0.1.4 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/crlib v0.0.0-20251122031428-fe658a2dbda1 // indirect github.com/cockroachdb/errors v1.12.0 // indirect github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 // indirect @@ -60,6 +60,7 @@ require ( github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.19.2 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/btree v1.8.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect diff --git a/go.sum b/go.sum index 4e4a50e..faba2fa 100644 --- a/go.sum +++ b/go.sum @@ -115,6 +115,8 @@ github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05Zp github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA= diff --git a/pkg/algorithm/heavykeeper/heavykeeper.go b/pkg/algorithm/heavykeeper/heavykeeper.go new file mode 100644 index 0000000..57ce88e --- /dev/null +++ b/pkg/algorithm/heavykeeper/heavykeeper.go @@ -0,0 +1,139 @@ +package heavykeeper + +import ( + "hash/fnv" + "math" + "math/rand" + "sync" + "time" +) + +// HeavyKeeper is a probabilistic data structure for top-k items. +type HeavyKeeper struct { + buckets [][]bucket + depth int + width int + decay float64 + r *rand.Rand + mu sync.RWMutex +} + +type bucket struct { + fingerprint uint64 + count uint32 +} + +// New creates a new HeavyKeeper. +// depth: number of arrays (hash functions) +// width: number of buckets per array +// decay: probability of decay (0.9 means 90% chance to decay) +func New(depth, width int, decay float64) *HeavyKeeper { + hk := &HeavyKeeper{ + buckets: make([][]bucket, depth), + depth: depth, + width: width, + decay: decay, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + } + + for i := range hk.buckets { + hk.buckets[i] = make([]bucket, width) + } + + return hk +} + +// Add adds a key to the HeavyKeeper. +func (hk *HeavyKeeper) Add(key []byte) { + hk.mu.Lock() + defer hk.mu.Unlock() + + fingerprint := hk.hash(key) + + // Use double hashing for multiple hash functions + // h1 = fingerprint + // h2 = fnv(key) + // idx = (h1 + i*h2) % width + h2 := hk.hash2(key) + + for i := 0; i < hk.depth; i++ { + idx := (fingerprint + uint64(i)*h2) % uint64(hk.width) + b := &hk.buckets[i][idx] + + if b.count == 0 { + b.fingerprint = fingerprint + b.count = 1 + continue + } + + if b.fingerprint == fingerprint { + b.count++ + continue + } + + // Decay + if hk.r.Float64() < math.Pow(hk.decay, float64(b.count)) { + b.count-- + if b.count == 0 { + b.fingerprint = fingerprint + b.count = 1 + } + } + } +} + +// Query returns the estimated count for the key. +func (hk *HeavyKeeper) Query(key []byte) uint32 { + hk.mu.RLock() + defer hk.mu.RUnlock() + + fingerprint := hk.hash(key) + h2 := hk.hash2(key) + var maxCount uint32 + + for i := 0; i < hk.depth; i++ { + idx := (fingerprint + uint64(i)*h2) % uint64(hk.width) + b := &hk.buckets[i][idx] + + if b.fingerprint == fingerprint { + if b.count > maxCount { + maxCount = b.count + } + } + } + + return maxCount +} + +// Clear resets the HeavyKeeper. +func (hk *HeavyKeeper) Clear() { + hk.mu.Lock() + defer hk.mu.Unlock() + + for i := range hk.buckets { + for j := range hk.buckets[i] { + hk.buckets[i][j].count = 0 + hk.buckets[i][j].fingerprint = 0 + } + } +} + +func (hk *HeavyKeeper) hash(key []byte) uint64 { + h := fnv.New64a() + h.Write(key) + return h.Sum64() +} + +func (hk *HeavyKeeper) hash2(key []byte) uint64 { + // Simple secondary hash: fnv with salt or just different algo + // Using FNV-1 (not 1a) or just rotate? + // Let's use a simple mix. + h := uint64(2166136261) + for _, c := range key { + h *= 16777619 + h ^= uint64(c) + } + // salt + h ^= 0x9e3779b97f4a7c15 + return h +} diff --git a/pkg/algorithm/heavykeeper/heavykeeper_test.go b/pkg/algorithm/heavykeeper/heavykeeper_test.go new file mode 100644 index 0000000..1b95405 --- /dev/null +++ b/pkg/algorithm/heavykeeper/heavykeeper_test.go @@ -0,0 +1,86 @@ +package heavykeeper + +import ( + "fmt" + "testing" +) + +func TestHeavyKeeper_AddAndQuery(t *testing.T) { + hk := New(3, 1024, 0.9) + + key := []byte("test-key") + + for i := 0; i < 100; i++ { + hk.Add(key) + } + + count := hk.Query(key) + if count < 80 { // Allow some error due to decay/collision + t.Errorf("Expected count around 100, got %d", count) + } + + // Test another key + key2 := []byte("another-key") + count2 := hk.Query(key2) + if count2 != 0 { + t.Errorf("Expected count 0 for new key, got %d", count2) + } +} + +func TestHeavyKeeper_Decay(t *testing.T) { + hk := New(3, 10, 0.9) // Small width to force collisions + + // Fill with "noise" + for i := 0; i < 1000; i++ { + key := []byte(fmt.Sprintf("noise-%d", i)) + hk.Add(key) + } + + // Add "heavy" item + heavyKey := []byte("heavy") + for i := 0; i < 100; i++ { + hk.Add(heavyKey) + } + + count := hk.Query(heavyKey) + t.Logf("Heavy key count: %d", count) + if count < 50 { + t.Errorf("Heavy key count too low: %d", count) + } +} + +func TestHeavyKeeper_Clear(t *testing.T) { + hk := New(3, 1024, 0.9) + key := []byte("test") + hk.Add(key) + if hk.Query(key) == 0 { + t.Fatal("Should verify") + } + + hk.Clear() + if hk.Query(key) != 0 { + t.Fatal("Should be 0 after Clear") + } +} + +func BenchmarkHeavyKeeper_Add(b *testing.B) { + hk := New(3, 4096, 0.9) + key := []byte("bench-key") + b.ResetTimer() + for i := 0; i < b.N; i++ { + hk.Add(key) + } +} + +func BenchmarkHeavyKeeper_AddRandom(b *testing.B) { + hk := New(3, 4096, 0.9) + keys := make([][]byte, 1000) + for i := range keys { + keys[i] = []byte(fmt.Sprintf("key-%d", i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + hk.Add(keys[i%1000]) + } +} diff --git a/server/middleware/caching/caching_chunkpart_test.go b/server/middleware/caching/caching_chunkpart_test.go index 8eb5973..d8be49a 100644 --- a/server/middleware/caching/caching_chunkpart_test.go +++ b/server/middleware/caching/caching_chunkpart_test.go @@ -8,8 +8,8 @@ import ( "testing" "github.com/kelindar/bitmap" + "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" - "github.com/omalloc/tavern/conf" "github.com/omalloc/tavern/contrib/log" "github.com/omalloc/tavern/proxy" "github.com/omalloc/tavern/storage/bucket/memory" @@ -28,7 +28,7 @@ func makebuf(size int) []byte { } func Test_getContents(t *testing.T) { - memoryBucket, _ := memory.New(&conf.Bucket{}, sharedkv.NewEmpty()) + memoryBucket, _ := memory.New(&storage.BucketConfig{}, sharedkv.NewEmpty()) req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, "http://www.example.com/path/to/1.apk", nil) objectID, _ := newObjectIDFromRequest(req, "", true) diff --git a/server/middleware/caching/internal.go b/server/middleware/caching/internal.go index 72eb64e..604dd9c 100644 --- a/server/middleware/caching/internal.go +++ b/server/middleware/caching/internal.go @@ -39,6 +39,7 @@ type Caching struct { processor *ProcessorChain opt *cachingOption req *http.Request + ctx context.Context id *object.ID md *object.Metadata rootmd *object.Metadata diff --git a/server/middleware/caching/processor.go b/server/middleware/caching/processor.go index fefab56..942db2c 100644 --- a/server/middleware/caching/processor.go +++ b/server/middleware/caching/processor.go @@ -107,6 +107,7 @@ func (pc *ProcessorChain) preCacheProcessor(proxyClient proxy.Proxy, store stora caching := &Caching{ log: log.Context(req.Context()), + ctx: req.Context(), proxyClient: proxyClient, opt: opt, id: objectID, @@ -151,7 +152,8 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request _, _ = io.Copy(io.Discard, resp.Body) } - // TODO: incr index ref count. + // incr index ref count. + caching.bucket.Touch(caching.ctx, caching.id) return resp, nil } diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index f962fa7..bbae62f 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -13,6 +13,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "time" "github.com/omalloc/tavern/pkg/iobuf" @@ -20,8 +21,8 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" - "github.com/omalloc/tavern/conf" "github.com/omalloc/tavern/contrib/log" + "github.com/omalloc/tavern/pkg/algorithm/heavykeeper" "github.com/omalloc/tavern/pkg/algorithm/lru" "github.com/omalloc/tavern/storage/indexdb" ) @@ -29,33 +30,51 @@ import ( var _ storage.Bucket = (*diskBucket)(nil) type diskBucket struct { - path string - dbPath string - driver string - storeType string - asyncLoad bool - weight int - sharedkv storage.SharedKV - indexdb storage.IndexDB - cache *lru.Cache[object.IDHash, storage.Mark] - fileFlag int - fileMode fs.FileMode - stop chan struct{} -} - -func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) { + opt *storage.BucketConfig + path string + dbPath string + driver string + storeType string + asyncLoad bool + weight int + sharedkv storage.SharedKV + indexdb storage.IndexDB + hasMigration bool + migration storage.Migration + hkPromote *heavykeeper.HeavyKeeper + lastPromoteReset time.Time + promMu sync.Mutex + cache *lru.Cache[object.IDHash, storage.Mark] + fileFlag int + fileMode fs.FileMode + stop chan struct{} +} + +func New(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, error) { bucket := &diskBucket{ - path: config.Path, - dbPath: config.DBPath, - driver: config.Driver, - storeType: config.Type, - asyncLoad: config.AsyncLoad, - weight: 100, // default weight - sharedkv: sharedkv, - cache: lru.New[object.IDHash, storage.Mark](config.MaxObjectLimit), - fileFlag: os.O_RDONLY, - fileMode: fs.FileMode(0o755), - stop: make(chan struct{}, 1), + opt: opt, + path: opt.Path, + dbPath: opt.DBPath, + driver: opt.Driver, + storeType: opt.Type, + asyncLoad: opt.AsyncLoad, + hasMigration: opt.Migration != nil && opt.Migration.Enabled, + weight: 100, // default weight + sharedkv: sharedkv, + cache: lru.New[object.IDHash, storage.Mark](opt.MaxObjectLimit), + fileFlag: os.O_RDONLY, + fileMode: fs.FileMode(0o755), + stop: make(chan struct{}, 1), + } + + if opt.Migration != nil && opt.Migration.Enabled { + // Default width 4096 if not set or small + width := opt.MaxObjectLimit + if width < 4096 { + width = 4096 + } + bucket.hkPromote = heavykeeper.New(3, width, 0.9) + bucket.lastPromoteReset = time.Now() } // hard code of check os. @@ -66,10 +85,13 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) bucket.initWorkdir() // create indexdb - db, err := indexdb.Create(config.DBType, - indexdb.NewOption(config.DBPath, indexdb.WithType("pebble"), indexdb.WithDBConfig(config.DBConfig))) + db, err := indexdb.Create(opt.DBType, indexdb.NewOption( + opt.DBPath, + indexdb.WithType("pebble"), + indexdb.WithDBConfig(opt.DBConfig), + )) if err != nil { - log.Errorf("failed to create %s(%s) indexdb %v", config.DBType, config.DBPath, err) + log.Errorf("failed to create %s(%s) indexdb %v", opt.DBType, opt.DBPath, err) return nil, err } bucket.indexdb = db @@ -91,16 +113,46 @@ func (d *diskBucket) evict() { clog.Debugf("start evict goroutine for %s", d.ID()) + demote := func(evicted lru.Eviction[object.IDHash, storage.Mark]) error { + if d.migration != nil { + md, err := d.indexdb.Get(context.Background(), evicted.Key[:]) + if err != nil { + return err + } + if md == nil || md.ID == nil { + return fmt.Errorf("metadata not found for demotion") + } + log.Debugf("demote %s to %s", d.storeType, md.ID.Key()) + return d.migration.Demote(context.Background(), md.ID, d) + } + return nil + } + + discard := func(evicted lru.Eviction[object.IDHash, storage.Mark]) { + fd := evicted.Key.WPath(d.path) + clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) + _ = d.DiscardWithHash(context.Background(), evicted.Key) + } + go func() { for { select { case <-d.stop: return case evicted := <-ch: - fd := evicted.Key.WPath(d.path) - clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) - // TODO: discard expired cachefile or Move to cold storage - d.DiscardWithHash(context.Background(), evicted.Key) + // expired cachefile Demote to other bucket + if d.migration != nil { + + if err := demote(evicted); err != nil { + log.Warnf("demote failed: %v", err) + // fallback to discard + discard(evicted) + continue + } + continue + } + + discard(evicted) } } }() @@ -135,7 +187,7 @@ func (d *diskBucket) loadLRU() { if meta != nil { mdCount++ chunkCount += meta.Chunks.Count() - d.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, uint64(meta.Refs))) + d.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, meta.Refs)) // store service domains // TODO: add Debounce incr @@ -265,9 +317,17 @@ func (d *diskBucket) Iterate(ctx context.Context, fn func(*object.Metadata) erro // Lookup implements storage.Bucket. func (d *diskBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error) { md, err := d.indexdb.Get(ctx, id.Bytes()) + if err == nil && md != nil { + d.touch(ctx, id) + } return md, err } +// Touch implements [storage.Bucket]. +func (d *diskBucket) Touch(ctx context.Context, id *object.ID) { + d.touch(ctx, id) +} + // Remove implements storage.Bucket. func (d *diskBucket) Remove(ctx context.Context, id *object.ID) error { return d.indexdb.Delete(ctx, id.Bytes()) @@ -291,7 +351,7 @@ func (d *diskBucket) Store(ctx context.Context, meta *object.Metadata) error { meta.Headers.Del("X-Protocol-Request-Id") if !d.cache.Has(meta.ID.Hash()) { - d.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, uint64(meta.Refs))) + d.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, meta.Refs)) } if err := d.indexdb.Set(ctx, meta.ID.Bytes(), meta); err != nil { @@ -312,6 +372,44 @@ func (d *diskBucket) Store(ctx context.Context, meta *object.Metadata) error { return nil } +func (d *diskBucket) touch(ctx context.Context, id *object.ID) { + mark := d.cache.Get(id.Hash()) + if mark == nil { + return + } + if mark.LastAccess() <= 0 { + return + } + + mark.SetLastAccess(time.Now().Unix()) + mark.SetRefs(mark.Refs() + 1) + + d.cache.Set(id.Hash(), *mark) + + // 如果迁移开启的,则进行计算窗口期是否满足迁移配置 + if d.hasMigration { + // promote check + d.promMu.Lock() + if d.opt.Migration.Promote.Window > 0 && time.Since(d.lastPromoteReset) > d.opt.Migration.Promote.Window { + d.hkPromote.Clear() + d.lastPromoteReset = time.Now() + } + d.promMu.Unlock() + + d.hkPromote.Add(id.Bytes()) + if d.hkPromote.Query(id.Bytes()) >= uint32(d.opt.Migration.Promote.MinHits) { + go func() { + // check migration interface + if d.migration != nil { + if err := d.migration.Promote(context.Background(), id, d); err != nil { + log.Warnf("promote %s failed: %v", id.Key(), err) + } + } + }() + } + } +} + // HasBad implements storage.Bucket. func (d *diskBucket) HasBad() bool { return false @@ -378,6 +476,65 @@ func (d *diskBucket) ReadChunkFile(ctx context.Context, id *object.ID, index uin return f, wpath, err } +// Migrate implements [storage.Bucket]. +func (d *diskBucket) Migrate(ctx context.Context, id *object.ID, dest storage.Bucket) error { + md, err := d.indexdb.Get(ctx, id.Bytes()) + if err != nil { + return err + } + clog := log.Context(ctx) + + var moveErr error + md.Chunks.Range(func(x uint32) { + if moveErr != nil { + return + } + + rf, _, err1 := d.ReadChunkFile(ctx, id, x) + if err1 != nil { + moveErr = err1 + return + } + defer rf.Close() + + wf, _, err2 := dest.WriteChunkFile(ctx, id, x) + if err2 != nil { + moveErr = err2 + return + } + defer wf.Close() + + if _, err2 = io.Copy(wf, rf); err2 != nil { + moveErr = err2 + } + }) + + if moveErr != nil { + clog.Errorf("failed to move object %s chunks: %v", id.Key(), moveErr) + return moveErr + } + + // 2. Store metadata in target + if err := dest.Store(ctx, md); err != nil { + clog.Errorf("failed to store metadata in target bucket for %s: %v", id.Key(), err) + return err + } + + // 3. Discard locally + if err := d.discard(ctx, md); err != nil { + clog.Errorf("failed to discard object %s from source bucket: %v", id.Key(), err) + return err + } + + return nil +} + +// SetMigration implements storage.Bucket. +func (d *diskBucket) SetMigration(migration storage.Migration) error { + d.migration = migration + return nil +} + // Close implements storage.Bucket. func (d *diskBucket) Close() error { return d.indexdb.Close() diff --git a/storage/bucket/disk/disk_migration_test.go b/storage/bucket/disk/disk_migration_test.go new file mode 100644 index 0000000..a551bb8 --- /dev/null +++ b/storage/bucket/disk/disk_migration_test.go @@ -0,0 +1,135 @@ +package disk_test + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/omalloc/tavern/api/defined/v1/storage" + "github.com/omalloc/tavern/api/defined/v1/storage/object" + "github.com/omalloc/tavern/storage/bucket/disk" + "github.com/omalloc/tavern/storage/sharedkv" +) + +// MockMigration +type MockMigration struct { + mock.Mock +} + +func (m *MockMigration) Promote(ctx context.Context, id *object.ID, src storage.Bucket) error { + args := m.Called(ctx, id, src) + return args.Error(0) +} + +func (m *MockMigration) Demote(ctx context.Context, id *object.ID, src storage.Bucket) error { + args := m.Called(ctx, id, src) + return args.Error(0) +} + +func TestMigration_Promote(t *testing.T) { + basepath := t.TempDir() + shared := sharedkv.NewMemSharedKV() + + cfg := &storage.BucketConfig{ + Path: basepath, + DBPath: filepath.Join(basepath, ".indexdb"), + DBType: "pebble", + Driver: "native", + Type: "normal", + Migration: &storage.MigrationConfig{ + Enabled: true, + Promote: storage.PromoteConfig{ + MinHits: 2, // Changed to 2 for easier testing + Window: time.Minute, + }, + }, + MaxObjectLimit: 100, + } + + b, err := disk.New(cfg, shared) + assert.NoError(t, err) + defer b.Close() + + mockMig := new(MockMigration) + _ = b.SetMigration(mockMig) + + id := object.NewID("http://example.com/obj1") + + // Store object + err = b.Store(context.Background(), &object.Metadata{ + ID: id, + Code: 200, + Size: 100, + LastRefUnix: time.Now().Unix(), + Refs: 1, + }) + assert.NoError(t, err) + + mockMig.On("Promote", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + // Hit 1 -> Total Hits = 1 (Store doesn't count as hit in HeavyKeeper usually, but let's check implementation) + // In disk.go: Store call cache.Set. touch call cache.Set. + // touch logic: hkPromote.Add + // Lookup calls touch. + + // Lookup 1 + b.Touch(context.Background(), id) + // Lookup 2 + b.Touch(context.Background(), id) + + // Wait for async promote + time.Sleep(time.Second * 1) + + mockMig.AssertCalled(t, "Promote", mock.Anything, mock.Anything, mock.Anything) +} + +func TestMigration_Demote(t *testing.T) { + basepath := t.TempDir() + shared := sharedkv.NewMemSharedKV() + + cfg := &storage.BucketConfig{ + Path: basepath, + DBPath: filepath.Join(basepath, ".indexdb"), + DBType: "pebble", + Driver: "native", + Type: "normal", + Migration: &storage.MigrationConfig{ + Enabled: true, + Demote: storage.DemoteConfig{ + MinHits: 10, + Window: time.Minute, + Occupancy: 40, // Trigger at 40% + }, + }, + MaxObjectLimit: 10, + } + + b, err := disk.New(cfg, shared) + assert.NoError(t, err) + defer b.Close() + + mockMig := new(MockMigration) + _ = b.SetMigration(mockMig) + + mockMig.On("Demote", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + // (11 items) 1 item demote + for i := 0; i < 11; i++ { + id := object.NewID(fmt.Sprintf("http://example.com/obj%d", i)) + _ = b.Store(context.Background(), &object.Metadata{ + ID: id, + Code: 200, + }) + } + + // Wait for demote ticker + time.Sleep(time.Second) + + // Verify Demote was called + mockMig.AssertCalled(t, "Demote", mock.Anything, mock.Anything, mock.Anything) +} diff --git a/storage/bucket/disk/disk_test.go b/storage/bucket/disk/disk_test.go index f8af68b..6d7e166 100644 --- a/storage/bucket/disk/disk_test.go +++ b/storage/bucket/disk/disk_test.go @@ -11,13 +11,13 @@ import ( storagev1 "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" - "github.com/omalloc/tavern/conf" - "github.com/omalloc/tavern/storage" + "github.com/omalloc/tavern/storage/bucket/disk" + _ "github.com/omalloc/tavern/storage/indexdb/pebble" "github.com/omalloc/tavern/storage/sharedkv" ) func newTestBucket(t *testing.T, basepath string) storagev1.Bucket { - bucket, err := storage.NewBucket(&conf.Bucket{ + bucket, err := disk.New(&storagev1.BucketConfig{ Path: basepath, Driver: "native", Type: "normal", @@ -30,7 +30,7 @@ func newTestBucket(t *testing.T, basepath string) storagev1.Bucket { return bucket } -func TesMissKey(t *testing.T) { +func TestMissKey(t *testing.T) { basepath := t.TempDir() bucket := newTestBucket(t, basepath) diff --git a/storage/bucket/empty/empty.go b/storage/bucket/empty/empty.go index 1f8030b..8e73c27 100644 --- a/storage/bucket/empty/empty.go +++ b/storage/bucket/empty/empty.go @@ -7,7 +7,6 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" - "github.com/omalloc/tavern/conf" ) var _ storage.Bucket = (*emptyBucket)(nil) @@ -80,6 +79,10 @@ func (e *emptyBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metada return nil, storage.ErrKeyNotFound } +// Touch implements [storage.Bucket]. +func (e *emptyBucket) Touch(ctx context.Context, id *object.ID) { +} + // Remove implements storage.Bucket. func (e *emptyBucket) Remove(ctx context.Context, id *object.ID) error { return nil @@ -122,7 +125,17 @@ func (e *emptyBucket) ReadChunkFile(ctx context.Context, id *object.ID, index ui return nil, "discard", nil } -func New(c *conf.Bucket, _ storage.SharedKV) (storage.Bucket, error) { +// Migrate implements [storage.Bucket]. +func (e *emptyBucket) Migrate(ctx context.Context, id *object.ID, dest storage.Bucket) error { + return nil +} + +// SetMigration implements [storage.Bucket]. +func (e *emptyBucket) SetMigration(migration storage.Migration) error { + return nil +} + +func New(c *storage.BucketConfig, _ storage.SharedKV) (storage.Bucket, error) { path := c.Path if path == "" { path = "/dev/null" diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index ba9ed43..39039f1 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -16,7 +16,6 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" - "github.com/omalloc/tavern/conf" "github.com/omalloc/tavern/contrib/log" "github.com/omalloc/tavern/pkg/algorithm/lru" "github.com/omalloc/tavern/pkg/iobuf" @@ -34,6 +33,7 @@ type memoryBucket struct { weight int sharedkv storage.SharedKV indexdb storage.IndexDB + migration storage.Migration cache *lru.Cache[object.IDHash, storage.Mark] fileFlag int fileMode fs.FileMode @@ -42,16 +42,16 @@ type memoryBucket struct { stop chan struct{} } -func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) { +func New(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, error) { mb := &memoryBucket{ fs: vfs.NewMem(), path: "/", + driver: opt.Driver, dbPath: storage.TypeInMemory, - driver: config.Driver, storeType: storage.TypeInMemory, weight: 100, // default weight sharedkv: sharedkv, - cache: lru.New[object.IDHash, storage.Mark](config.MaxObjectLimit), // in-memory object size + cache: lru.New[object.IDHash, storage.Mark](opt.MaxObjectLimit), // in-memory object size fileFlag: os.O_RDONLY, fileMode: fs.FileMode(0o755), maxSize: 1024 * 1024 * 100, // e.g. 100 MB @@ -61,7 +61,7 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) // create indexdb only in-memory db, err := indexdb.Create("pebble", indexdb.NewOption(mb.dbPath, indexdb.WithType("pebble"))) if err != nil { - log.Errorf("failed to create %s indexdb %v", config.DBType, err) + log.Errorf("failed to create %s indexdb %v", opt.DBType, err) return nil, err } mb.indexdb = db @@ -202,6 +202,22 @@ func (m *memoryBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metad return md, err } +// Touch implements [storage.Bucket]. +func (m *memoryBucket) Touch(ctx context.Context, id *object.ID) { + mark := m.cache.Get(id.Hash()) + if mark == nil { + return + } + if mark.LastAccess() <= 0 { + return + } + + mark.SetLastAccess(time.Now().Unix()) + mark.SetRefs(mark.Refs() + 1) + + m.cache.Set(id.Hash(), *mark) +} + // Path implements [storage.Bucket]. func (m *memoryBucket) Path() string { return m.path @@ -231,7 +247,7 @@ func (m *memoryBucket) Store(ctx context.Context, meta *object.Metadata) error { } // update lru - m.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, uint64(meta.Refs))) + m.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, meta.Refs)) // save domains counter if u, err1 := url.Parse(meta.ID.Path()); err1 == nil { @@ -273,6 +289,65 @@ func (m *memoryBucket) ReadChunkFile(_ context.Context, id *object.ID, index uin return storage.WrapVFSFile(f), wpath, err } +// Migrate implements [storage.Bucket]. +func (m *memoryBucket) Migrate(ctx context.Context, id *object.ID, dest storage.Bucket) error { + md, err := m.indexdb.Get(ctx, id.Bytes()) + if err != nil { + return err + } + clog := log.Context(ctx) + + var moveErr error + md.Chunks.Range(func(x uint32) { + if moveErr != nil { + return + } + + rf, _, err1 := m.ReadChunkFile(ctx, id, x) + if err1 != nil { + moveErr = err1 + return + } + defer rf.Close() + + wf, _, err2 := dest.WriteChunkFile(ctx, id, x) + if err2 != nil { + moveErr = err2 + return + } + defer wf.Close() + + if _, err2 = io.Copy(wf, rf); err2 != nil { + moveErr = err2 + } + }) + + if moveErr != nil { + clog.Errorf("failed to move object %s chunks: %v", id.Key(), moveErr) + return moveErr + } + + // 2. Store metadata in target + if err := dest.Store(ctx, md); err != nil { + clog.Errorf("failed to store metadata in target bucket for %s: %v", id.Key(), err) + return err + } + + // 3. Discard locally + if err := m.discard(ctx, md); err != nil { + clog.Errorf("failed to discard object %s from source bucket: %v", id.Key(), err) + return err + } + + return nil +} + +// SetMigration implements [storage.Bucket]. +func (m *memoryBucket) SetMigration(migration storage.Migration) error { + m.migration = migration + return nil +} + // StoreType implements [storage.Bucket]. func (m *memoryBucket) StoreType() string { return m.storeType diff --git a/storage/bucket/memory/memory_test.go b/storage/bucket/memory/memory_test.go index eede979..6951b1c 100644 --- a/storage/bucket/memory/memory_test.go +++ b/storage/bucket/memory/memory_test.go @@ -7,7 +7,6 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" - "github.com/omalloc/tavern/conf" "github.com/omalloc/tavern/storage/bucket/memory" "github.com/omalloc/tavern/storage/sharedkv" "github.com/stretchr/testify/assert" @@ -19,7 +18,7 @@ import ( func TestMemoryBucket(t *testing.T) { var m runtime.MemStats - bucket, err := memory.New(&conf.Bucket{ + bucket, err := memory.New(&storage.BucketConfig{ Path: "inmemory", DBType: storage.TypeInMemory, }, sharedkv.NewMemSharedKV()) diff --git a/storage/builder.go b/storage/builder.go index e8214ce..8c93ae5 100644 --- a/storage/builder.go +++ b/storage/builder.go @@ -20,16 +20,17 @@ type globalBucketOption struct { Driver string DBType string DBPath string + Migration *storage.MigrationConfig } // implements storage.Bucket map. -var bucketMap = map[string]func(opt *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error){ +var bucketMap = map[string]func(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, error){ "empty": empty.New, "native": disk.New, // disk is an alias of native "memory": memory.New, // in-memory disk. restart as lost. } -func NewBucket(opt *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) { +func NewBucket(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, error) { factory, exist := bucketMap[opt.Driver] if !exist { return nil, errors.New("bucket factory not found") @@ -37,23 +38,24 @@ func NewBucket(opt *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, err return factory(opt, sharedkv) } -func mergeConfig(global *globalBucketOption, bucket *conf.Bucket) *conf.Bucket { +func mergeConfig(global *globalBucketOption, bucket *conf.Bucket) *storage.BucketConfig { // copied from conf bucket. - copied := &conf.Bucket{ + copied := &storage.BucketConfig{ Path: bucket.Path, Driver: bucket.Driver, Type: bucket.Type, DBType: bucket.DBType, DBPath: bucket.DBPath, MaxObjectLimit: bucket.MaxObjectLimit, - DBConfig: bucket.DBConfig, // custom db config + Migration: global.Migration, // migration config + DBConfig: bucket.DBConfig, // custom db config } if copied.Driver == "" { copied.Driver = global.Driver } if copied.Type == "" { - copied.Type = "normal" + copied.Type = storage.TypeWarm } if copied.DBType == "" { copied.DBType = global.DBType diff --git a/storage/diraware/bucket.go b/storage/diraware/bucket.go index ca7ad54..e0f9cf5 100644 --- a/storage/diraware/bucket.go +++ b/storage/diraware/bucket.go @@ -9,6 +9,8 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage/object" ) +var _ storagev1.Bucket = (*wrappedBucket)(nil) + type wrappedBucket struct { base storagev1.Bucket checker Checker @@ -43,6 +45,11 @@ func (b *wrappedBucket) Lookup(ctx context.Context, id *object.ID) (*object.Meta return md, nil } +// Touch implements [storage.Bucket]. +func (b *wrappedBucket) Touch(ctx context.Context, id *object.ID) { + b.base.Touch(ctx, id) +} + func (b *wrappedBucket) Store(ctx context.Context, meta *object.Metadata) error { return b.base.Store(ctx, meta) } @@ -87,6 +94,15 @@ func (b *wrappedBucket) ReadChunkFile(ctx context.Context, id *object.ID, index return b.base.ReadChunkFile(ctx, id, index) } +// Migrate implements [storage.Bucket]. +func (b *wrappedBucket) Migrate(ctx context.Context, id *object.ID, dest storagev1.Bucket) error { + return b.base.Migrate(ctx, id, dest) +} + +func (b *wrappedBucket) SetMigration(m storagev1.Migration) error { + return b.base.SetMigration(m) +} + func (b *wrappedBucket) ID() string { return b.base.ID() } diff --git a/storage/migrator.go b/storage/migrator.go new file mode 100644 index 0000000..9968227 --- /dev/null +++ b/storage/migrator.go @@ -0,0 +1,396 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/omalloc/tavern/api/defined/v1/storage" + "github.com/omalloc/tavern/api/defined/v1/storage/object" + "github.com/omalloc/tavern/conf" + "github.com/omalloc/tavern/contrib/log" + "github.com/omalloc/tavern/storage/bucket/empty" + "github.com/omalloc/tavern/storage/selector" + "github.com/omalloc/tavern/storage/sharedkv" +) + +var _ storage.Migrator = (*migratorStorage)(nil) + +type migratorStorage struct { + closed bool + mu sync.Mutex + log *log.Helper + + warmSelector storage.Selector // warm selector + hotSelector storage.Selector // hot selector + coldSelector storage.Selector // cold selector + sharedkv storage.SharedKV + nopBucket storage.Bucket + memoryBucket storage.Bucket + hotBucket []storage.Bucket + warmBucket []storage.Bucket + coldBucket []storage.Bucket +} + +func NewMigrator(config *conf.Storage, logger log.Logger) (storage.Migrator, error) { + nopBucket, _ := empty.New(&storage.BucketConfig{}, sharedkv.NewEmpty()) + + m := &migratorStorage{ + closed: false, + mu: sync.Mutex{}, + log: log.NewHelper(logger), + + warmSelector: nil, + hotSelector: nil, + coldSelector: nil, + sharedkv: sharedkv.NewMemSharedKV(), + nopBucket: nopBucket, + memoryBucket: nil, + hotBucket: make([]storage.Bucket, 0, len(config.Buckets)), + warmBucket: make([]storage.Bucket, 0, len(config.Buckets)), + coldBucket: make([]storage.Bucket, 0, len(config.Buckets)), + } + + if err := m.reinit(config); err != nil { + return nil, err + } + + // diraware adapter + // 关闭可以提升性能,但是目录推送只能使用硬删除模式,无法使用过期标记 + if config.DirAware != nil && config.DirAware.Enabled { + if config.DirAware.StorePath != "" { + _ = os.MkdirAll(config.DirAware.StorePath, 0755) + // replace memkv with storekv + m.sharedkv = sharedkv.NewStoreSharedKV(config.DirAware.StorePath) + } + + // sharedkv used no-mem typ. + // return diraware.New(m, diraware.NewChecker(m.sharedkv, + // diraware.WithAutoClear(config.DirAware.AutoClear), + // )), nil + } + + return m, nil +} + +func (m *migratorStorage) reinit(config *conf.Storage) error { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := m.sharedkv.DropPrefix(ctx, []byte("if/domain/")); err != nil { + m.log.Warnf("failed to drop prefix key `if/domain/` counter: %s", err) + } + + globalConfig := &globalBucketOption{ + AsyncLoad: config.AsyncLoad, + EvictionPolicy: config.EvictionPolicy, + SelectionPolicy: config.SelectionPolicy, + Driver: config.Driver, + DBType: config.DBType, + DBPath: config.DBPath, + Migration: &storage.MigrationConfig{ + Enabled: config.Migration.Enabled, + Promote: storage.PromoteConfig{ + MinHits: config.Migration.Promote.MinHits, + Window: config.Migration.Promote.Window, + }, + Demote: storage.DemoteConfig{ + MinHits: config.Migration.Demote.MinHits, + Window: config.Migration.Demote.Window, + }, + }, + } + + for _, c := range config.Buckets { + bucket, err := NewBucket(mergeConfig(globalConfig, c), m.sharedkv) + if err != nil { + return err + } + + switch bucket.StoreType() { + case storage.TypeNormal, storage.TypeWarm: + m.warmBucket = append(m.warmBucket, bucket) + case storage.TypeHot: + m.hotBucket = append(m.hotBucket, bucket) + case storage.TypeCold: + m.coldBucket = append(m.coldBucket, bucket) + case storage.TypeInMemory: + if m.memoryBucket != nil { + return fmt.Errorf("only one inmemory bucket is allowed") + } + m.memoryBucket = bucket + } + } + + // wait for all buckets to be initialized + // load indexdb + // load lru + // load purge queue + + // storage layer init. + // hot + if len(m.hotBucket) > 0 { + m.hotSelector = selector.New(m.hotBucket, config.SelectionPolicy) + } else { + m.log.Infof("no hot bucket configured") + } + + // warm + if len(m.warmBucket) <= 0 { + m.log.Infof("no warm bucket configured") + // no warm bucket, use memory bucket + if m.memoryBucket != nil { + m.warmBucket = append(m.warmBucket, m.memoryBucket) + } + } + m.warmSelector = selector.New(m.warmBucket, config.SelectionPolicy) + + // cold + if len(m.coldBucket) > 0 { + m.coldSelector = selector.New(m.coldBucket, config.SelectionPolicy) + } else { + m.log.Infof("no cold bucket configured") + } + + // has enabled migration + if config.Migration != nil && config.Migration.Enabled { + for _, bucket := range m.Buckets() { + _ = bucket.SetMigration(m) + } + } + return nil +} + +// Select implements [storage.Migrator]. +func (m *migratorStorage) Select(ctx context.Context, id *object.ID) storage.Bucket { + // find bucket: Hot → Warm → Cold + return m.chainSelector(ctx, id, + m.hotSelector, + m.warmSelector, + m.coldSelector, + ) +} + +// Demote implements [storage.Migrator]. +func (m *migratorStorage) Demote(ctx context.Context, id *object.ID, src storage.Bucket) error { + // Hot -> Warm -> Cold + var layer string + switch src.StoreType() { + case storage.TypeHot: + layer = storage.TypeWarm + case storage.TypeWarm: + layer = storage.TypeCold + default: + return nil // no demotion for other types + } + + target := m.SelectLayer(ctx, id, layer) + if target == nil { + return fmt.Errorf("no target bucket found for demotion from %s to %s", src.StoreType(), layer) + } + + return src.Migrate(ctx, id, target) +} + +// Promote implements [storage.Migrator]. +func (m *migratorStorage) Promote(ctx context.Context, id *object.ID, src storage.Bucket) error { + // Cold -> Warm -> Hot + var layer string + switch src.StoreType() { + case storage.TypeCold: + layer = storage.TypeWarm + case storage.TypeWarm: + layer = storage.TypeHot + default: + return nil // no promotion for other types + } + + target := m.SelectLayer(ctx, id, layer) + if target == nil { + return fmt.Errorf("no target bucket found for promotion from %s to %s", src.StoreType(), layer) + } + + return src.Migrate(ctx, id, target) +} + +func (m *migratorStorage) SelectLayer(ctx context.Context, id *object.ID, layer string) storage.Bucket { + switch layer { + case storage.TypeHot: + if m.hotSelector != nil { + return m.hotSelector.Select(ctx, id) + } + case storage.TypeNormal, storage.TypeWarm: // TypeWarm is same as TypeNormal + if m.warmSelector != nil { + return m.warmSelector.Select(ctx, id) + } + case storage.TypeCold: + if m.coldSelector != nil { + return m.coldSelector.Select(ctx, id) + } + case storage.TypeInMemory: + return m.memoryBucket + } + return nil +} + +func (m *migratorStorage) chainSelector(ctx context.Context, id *object.ID, selectors ...storage.Selector) storage.Bucket { + for _, sel := range selectors { + if sel == nil { + continue + } + if bucket := sel.Select(ctx, id); bucket != nil && bucket.Exist(ctx, id.Bytes()) { + return bucket + } + } + + // fallback to warm selector + return m.warmSelector.Select(ctx, id) +} + +// Buckets implements [storage.Migrator]. +func (m *migratorStorage) Buckets() []storage.Bucket { + buckets := make([]storage.Bucket, 0, len(m.warmBucket)+len(m.hotBucket)+len(m.coldBucket)) + buckets = append(buckets, m.warmBucket...) + buckets = append(buckets, m.hotBucket...) + buckets = append(buckets, m.coldBucket...) + return buckets +} + +// PURGE implements [storage.Migrator]. +func (m *migratorStorage) PURGE(storeUrl string, typ storage.PurgeControl) error { + // Directory prefix purge + if typ.Dir { + // For mark-expired on dir, skip sharedkv hits and fallback to full scan below. + if typ.MarkExpired { + return nil + } + + // For directory purge, we prefer SharedKV inverted index when available: + // key schema: ix// + // value: object.IDHash bytes + ctx := context.Background() + processed := 0 + + for _, b := range m.Buckets() { + prefix := fmt.Sprintf("ix/%s/%s", b.ID(), storeUrl) + _ = m.sharedkv.IteratePrefix(ctx, []byte(prefix), func(key, val []byte) error { + // parse hash + var h object.IDHash + if len(val) >= object.IdHashSize { + copy(h[:], val[:object.IdHashSize]) + } else { + // skip invalid record + return nil + } + + if typ.Hard || !typ.MarkExpired { + if err := b.DiscardWithHash(ctx, h); err == nil { + processed++ + } + } + + // remove index mapping + _ = m.sharedkv.Delete(ctx, key) + return nil + }) + } + + // fallback: scan indexdb if no sharedkv hits, or to ensure completeness + if processed == 0 { + for _, b := range m.Buckets() { + _ = b.Iterate(ctx, func(md *object.Metadata) error { + if md == nil { + return nil + } + if strings.HasPrefix(md.ID.Path(), storeUrl) { + if typ.Hard || !typ.MarkExpired { + _ = b.DiscardWithMetadata(ctx, md) + } else { + md.ExpiresAt = time.Now().Add(-1).Unix() + _ = b.Store(ctx, md) + } + processed++ + } + return nil + }) + } + } + + if processed == 0 { + return storage.ErrKeyNotFound + } + return nil + } + + // Single object purge + cacheKey := object.NewID(storeUrl) + + bucket := m.Select(context.Background(), cacheKey) + if bucket == nil { + return fmt.Errorf("bucket not found") + } + + // hard delete cache file mode. + if typ.Hard { + return bucket.Discard(context.Background(), cacheKey) + } + + // MarkExpired to revalidate. + // soft delete cache file mode. + md, err := bucket.Lookup(context.Background(), cacheKey) + if err != nil { + return err + } + + // set expire time to past time. and then store it back. + md.ExpiresAt = time.Now().Add(-1).Unix() + // TODO: we should acquire a globalResourceLock before updating. + return bucket.Store(context.Background(), md) +} + +// Rebuild implements [storage.Migrator]. +func (m *migratorStorage) Rebuild(ctx context.Context, buckets []storage.Bucket) error { + return nil +} + +// SharedKV implements [storage.Migrator]. +func (m *migratorStorage) SharedKV() storage.SharedKV { + return m.sharedkv +} + +// Close implements [storage.Migrator]. +func (m *migratorStorage) Close() error { + var errs []error + // close all buckets + for _, bucket := range m.warmBucket { + errs = append(errs, bucket.Close()) + } + + for _, bucket := range m.hotBucket { + errs = append(errs, bucket.Close()) + } + + for _, bucket := range m.coldBucket { + errs = append(errs, bucket.Close()) + } + + if m.memoryBucket != nil { + if err := m.memoryBucket.Close(); err != nil { + errs = append(errs, err) + } + } + + // memdb close + if err := m.sharedkv.Close(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} diff --git a/storage/storage.go b/storage/storage.go index faed216..9d487d4 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -38,7 +38,12 @@ func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { // 填充默认配置 config.FillDefault() - nopBucket, _ := empty.New(&conf.Bucket{}, sharedkv.NewEmpty()) + // support migrator mode + if config.Migration != nil && config.Migration.Enabled { + return NewMigrator(config, logger) + } + + nopBucket, _ := empty.New(&storage.BucketConfig{}, sharedkv.NewEmpty()) n := &nativeStorage{ closed: false, mu: sync.Mutex{}, diff --git a/tests/config.test.yaml b/tests/config.test.yaml index 05acb4a..2490eb9 100644 --- a/tests/config.test.yaml +++ b/tests/config.test.yaml @@ -91,6 +91,19 @@ storage: eviction_policy: fifo # fifo, lru, lfu selection_policy: hashring # hashring, roundrobin slice_size: 524288 # 1MB + migration: + enabled: true # enable tiering bucket + promote: + min_hits: 10 # window hits to promote + window: 1m # 1 minute window + demote: + min_hits: 2 # window hits to demote + window: 5m # 5 minutes window + occupancy: 75 # percent useage + diraware: + enabled: false # 默认 true + store_path: /tmp/.diraware + auto_clear: true buckets: - path: inmemory driver: memory