From a43f9d8507e45172dfee3a1ed3f969ff2c00432b Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Thu, 22 Jan 2026 13:33:22 +0800 Subject: [PATCH 1/4] feat: warm/cold selector --- api/defined/v1/storage/indexdb.go | 1 + api/defined/v1/storage/storage.go | 2 + conf/conf.go | 63 +++++++++++++++++--------- server/middleware/caching/processor.go | 3 +- storage/bucket/disk/disk.go | 13 ++++++ storage/bucket/empty/empty.go | 4 ++ storage/bucket/memory/memory.go | 13 ++++++ storage/marked/bucket.go | 4 ++ storage/storage.go | 34 ++++++++++++-- 9 files changed, 110 insertions(+), 27 deletions(-) diff --git a/api/defined/v1/storage/indexdb.go b/api/defined/v1/storage/indexdb.go index 4f00827..24b6721 100644 --- a/api/defined/v1/storage/indexdb.go +++ b/api/defined/v1/storage/indexdb.go @@ -14,6 +14,7 @@ var ErrKeyNotFound = errors.New("key not found") const ( TypeInMemory = "inmemory" TypeNormal = "normal" + TypeCold = "cold" TypeHot = "hot" ) diff --git a/api/defined/v1/storage/storage.go b/api/defined/v1/storage/storage.go index 51b7ce2..9df159e 100644 --- a/api/defined/v1/storage/storage.go +++ b/api/defined/v1/storage/storage.go @@ -20,6 +20,8 @@ type Selector interface { type Operation interface { // Lookup retrieves the metadata for the specified object ID. Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error) + // Touch updates the last access time for the specified object ID. + Touch(ctx context.Context, id *object.ID) error // Store store the metadata for the specified object ID. Store(ctx context.Context, meta *object.Metadata) error // Exist checks if the object exists. diff --git a/conf/conf.go b/conf/conf.go index bb9b8bf..ed8432b 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -68,28 +68,49 @@ type Upstream struct { Features map[string]any `json:"features" yaml:"features"` } -type Storage struct { - Driver string `json:"driver" yaml:"driver"` - DBType string `json:"db_type" yaml:"db_type"` - DBPath string `json:"db_path" yaml:"db_path"` // default db path - AsyncLoad bool `json:"async_load" yaml:"async_load"` - EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"` - SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"` - SliceSize uint64 `json:"slice_size" yaml:"slice_size"` - Buckets []*Bucket `json:"buckets" yaml:"buckets"` -} +type ( + Storage struct { + Driver string `json:"driver" yaml:"driver"` + DBType string `json:"db_type" yaml:"db_type"` + DBPath string `json:"db_path" yaml:"db_path"` // default db path + AsyncLoad bool `json:"async_load" yaml:"async_load"` + EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"` + SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"` + SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part + Tiering *Tiering `json:"tiering" yaml:"tiering"` // tiering config + Buckets []*Bucket `json:"buckets" yaml:"buckets"` // bucket list + } -type Bucket struct { - Path string `json:"path" yaml:"path"` // local path or ? - Driver string `json:"driver" yaml:"driver"` // native, custom-driver - Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory - DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble - DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: /.indexdb - AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async - SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part - MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard - DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config -} + Bucket struct { + Path string `json:"path" yaml:"path"` // local path or ? + Driver string `json:"driver" yaml:"driver"` // native, custom-driver + Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory + DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble + DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: /.indexdb + AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async + SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part + MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard + DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config + } +) + +type ( + Promote struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m + MaxObjectSize uint64 `json:"max_object_size" yaml:"max_object_size"` // 最大对象大小 + } + Demote struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m + Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N% + } + Tiering struct { + Enabled bool `json:"enabled" yaml:"enabled"` + Promote Promote `json:"promote" yaml:"promote"` // 升温 + Demote Demote `json:"demote" yaml:"demote"` // 降温 + } +) type Plugin struct { Name string `json:"name" yaml:"name"` diff --git a/server/middleware/caching/processor.go b/server/middleware/caching/processor.go index fefab56..78a40ce 100644 --- a/server/middleware/caching/processor.go +++ b/server/middleware/caching/processor.go @@ -151,7 +151,8 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request _, _ = io.Copy(io.Discard, resp.Body) } - // TODO: incr index ref count. + // incr index ref count. + caching.bucket.Touch(req.Context(), caching.id) return resp, nil } diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index c4cd42b..aed4361 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -268,6 +268,19 @@ func (d *diskBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metadat return md, err } +// Touch implements [storage.Bucket]. +func (d *diskBucket) Touch(ctx context.Context, id *object.ID) error { + mark := d.cache.Get(id.Hash()) + if mark.LastAccess() <= 0 { + return nil + } + + mark.SetLastAccess(time.Now().Unix()) + mark.SetRefs(mark.Refs() + 1) + d.cache.Set(id.Hash(), mark) + return nil +} + // Remove implements storage.Bucket. func (d *diskBucket) Remove(ctx context.Context, id *object.ID) error { return d.indexdb.Delete(ctx, id.Bytes()) diff --git a/storage/bucket/empty/empty.go b/storage/bucket/empty/empty.go index 1f8030b..3f2293a 100644 --- a/storage/bucket/empty/empty.go +++ b/storage/bucket/empty/empty.go @@ -80,6 +80,10 @@ func (e *emptyBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metada return nil, storage.ErrKeyNotFound } +func (e *emptyBucket) Touch(ctx context.Context, id *object.ID) error { + return nil +} + // Remove implements storage.Bucket. func (e *emptyBucket) Remove(ctx context.Context, id *object.ID) error { return nil diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index ba9ed43..200af21 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -202,6 +202,19 @@ func (m *memoryBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metad return md, err } +// Touch implements [storage.Bucket]. +func (m *memoryBucket) Touch(ctx context.Context, id *object.ID) error { + mark := m.cache.Get(id.Hash()) + if mark.LastAccess() <= 0 { + return nil + } + + mark.SetLastAccess(time.Now().Unix()) + mark.SetRefs(mark.Refs() + 1) + m.cache.Set(id.Hash(), mark) + return nil +} + // Path implements [storage.Bucket]. func (m *memoryBucket) Path() string { return m.path diff --git a/storage/marked/bucket.go b/storage/marked/bucket.go index 1c87dd7..20d1d78 100644 --- a/storage/marked/bucket.go +++ b/storage/marked/bucket.go @@ -43,6 +43,10 @@ func (b *wrappedBucket) Lookup(ctx context.Context, id *object.ID) (*object.Meta return md, nil } +func (b *wrappedBucket) Touch(ctx context.Context, id *object.ID) error { + return b.base.Touch(ctx, id) +} + func (b *wrappedBucket) Store(ctx context.Context, meta *object.Metadata) error { return b.base.Store(ctx, meta) } diff --git a/storage/storage.go b/storage/storage.go index 10751ec..b4bfeaf 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -24,7 +24,9 @@ type nativeStorage struct { mu sync.Mutex log *log.Helper - selector storage.Selector + warmSelector storage.Selector // warm selector + hotSelector storage.Selector // hot selector + coldSelector storage.Selector // cold selector sharedkv storage.SharedKV nopBucket storage.Bucket memoryBucket storage.Bucket @@ -39,7 +41,9 @@ func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { mu: sync.Mutex{}, log: log.NewHelper(logger), - selector: selector.New([]storage.Bucket{}, config.SelectionPolicy), + warmSelector: nil, + hotSelector: nil, + coldSelector: nil, sharedkv: sharedkv.NewMemSharedKV(), nopBucket: nopBucket, memoryBucket: nil, @@ -102,16 +106,36 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { n.normalBucket = append(n.normalBucket, n.memoryBucket) } } + n.warmSelector = selector.New(n.normalBucket, config.SelectionPolicy) - n.selector = selector.New(n.normalBucket, config.SelectionPolicy) + // 热盘 + if len(n.hotBucket) > 0 { + n.hotSelector = selector.New(n.hotBucket, config.SelectionPolicy) + } else { + n.log.Infof("no hot bucket configured") + } return nil } // Select implements storage.Selector. func (n *nativeStorage) Select(ctx context.Context, id *object.ID) storage.Bucket { - bucket := n.selector.Select(ctx, id) - return bucket + return n.chainSelector(ctx, id, + n.hotSelector, + n.warmSelector, + n.coldSelector, + ) +} + +func (n *nativeStorage) chainSelector(ctx context.Context, id *object.ID, selectors ...storage.Selector) storage.Bucket { + for _, sel := range selectors { + if sel != nil { + if bucket := sel.Select(ctx, id); bucket != nil { + return bucket + } + } + } + return nil } // Rebuild implements storage.Selector. From 487baea3b13a20bff6322946f5227e2f2d16e919 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Thu, 22 Jan 2026 15:43:32 +0800 Subject: [PATCH 2/4] feat(tiering): implement warm/cold storage tiering and update related configurations --- api/defined/v1/storage/indexdb.go | 3 ++- conf/conf.go | 3 ++- config.example.yaml | 10 +++++++++ storage/bucket/disk/disk.go | 34 +++++++++++++++++++++++++++---- storage/builder.go | 2 ++ storage/storage.go | 20 ++++++++++++++++-- 6 files changed, 64 insertions(+), 8 deletions(-) diff --git a/api/defined/v1/storage/indexdb.go b/api/defined/v1/storage/indexdb.go index 24b6721..85f3047 100644 --- a/api/defined/v1/storage/indexdb.go +++ b/api/defined/v1/storage/indexdb.go @@ -13,7 +13,8 @@ var ErrKeyNotFound = errors.New("key not found") const ( TypeInMemory = "inmemory" - TypeNormal = "normal" + TypeNormal = "normal" // normal, warm 同一个 + TypeWarm = "normal" // normal, warm 同一个 TypeCold = "cold" TypeHot = "hot" ) diff --git a/conf/conf.go b/conf/conf.go index ed8432b..17796d2 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -84,13 +84,14 @@ type ( Bucket struct { Path string `json:"path" yaml:"path"` // local path or ? Driver string `json:"driver" yaml:"driver"` // native, custom-driver - Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory + Type string `json:"type" yaml:"type"` // warm(normal), cold, hot, inmemory DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: /.indexdb AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config + Tiering *Tiering `json:"-" yaml:"-"` // extend global tiering config } ) diff --git a/config.example.yaml b/config.example.yaml index 6abeaf7..396314a 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -93,6 +93,16 @@ storage: eviction_policy: fifo # fifo, lru, lfu selection_policy: hashring # hashring, roundrobin slice_size: 1048576 # 1MB + tiering: + enabled: false # enable tiering bucket + promote: + min_hits: 10 # window hits to promote + window: 1m # 1 minute window + max_object_size: 20971520 # 20MB + demote: + min_hits: 2 # window hits to demote + window: 5m # 5 minutes window + occupancy: 75 # percent useage buckets: - path: /cache1 type: normal diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index aed4361..30cbb9a 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -34,6 +34,7 @@ type diskBucket struct { driver string storeType string asyncLoad bool + tiering *conf.Tiering weight int sharedkv storage.SharedKV indexdb storage.IndexDB @@ -50,6 +51,7 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) driver: config.Driver, storeType: config.Type, asyncLoad: config.AsyncLoad, + tiering: config.Tiering, weight: 100, // default weight sharedkv: sharedkv, cache: lru.New[object.IDHash, storage.Mark](config.MaxObjectLimit), @@ -91,16 +93,40 @@ func (d *diskBucket) evict() { clog.Debugf("start evict goroutine for %s", d.ID()) + // Demote func + demoteTarget := storage.TypeNormal + if d.storeType == storage.TypeNormal { + demoteTarget = storage.TypeCold + } + demote := func(evicted lru.Eviction[object.IDHash, storage.Mark]) error { + // TODO: demote to target storage bucket + log.Debugf("demote %s to %s", demoteTarget, evicted.Key) + + return nil + } + + discard := func(evicted lru.Eviction[object.IDHash, storage.Mark]) { + fd := evicted.Key.WPath(d.path) + clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) + d.DiscardWithHash(context.Background(), evicted.Key) + } + go func() { for { select { case <-d.stop: return case evicted := <-ch: - fd := evicted.Key.WPath(d.path) - clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) - // TODO: discard expired cachefile or Move to cold storage - d.DiscardWithHash(context.Background(), evicted.Key) + // expired cachefile Demote to cold bucket + if d.tiering != nil && d.tiering.Enabled { + if err := demote(evicted); err != nil { + // fallback to discard + discard(evicted) + } + continue + } + + discard(evicted) } } }() diff --git a/storage/builder.go b/storage/builder.go index e8214ce..a248708 100644 --- a/storage/builder.go +++ b/storage/builder.go @@ -20,6 +20,7 @@ type globalBucketOption struct { Driver string DBType string DBPath string + Tiering conf.Tiering } // implements storage.Bucket map. @@ -47,6 +48,7 @@ func mergeConfig(global *globalBucketOption, bucket *conf.Bucket) *conf.Bucket { DBPath: bucket.DBPath, MaxObjectLimit: bucket.MaxObjectLimit, DBConfig: bucket.DBConfig, // custom db config + Tiering: &global.Tiering, } if copied.Driver == "" { diff --git a/storage/storage.go b/storage/storage.go index b4bfeaf..6b3e916 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -32,6 +32,7 @@ type nativeStorage struct { memoryBucket storage.Bucket hotBucket []storage.Bucket normalBucket []storage.Bucket + coldBucket []storage.Bucket } func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { @@ -73,6 +74,7 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { Driver: config.Driver, DBType: config.DBType, DBPath: config.DBPath, + Tiering: *config.Tiering, } for _, c := range config.Buckets { @@ -100,7 +102,9 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { // load lru // load purge queue + // warm / normal if len(n.normalBucket) <= 0 { + n.log.Infof("no warm bucket configured") // no normal bucket, use nop bucket if n.memoryBucket != nil { n.normalBucket = append(n.normalBucket, n.memoryBucket) @@ -108,7 +112,14 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { } n.warmSelector = selector.New(n.normalBucket, config.SelectionPolicy) - // 热盘 + // cold + if len(n.coldBucket) > 0 { + n.coldSelector = selector.New(n.coldBucket, config.SelectionPolicy) + } else { + n.log.Infof("no cold bucket configured") + } + + // hot if len(n.hotBucket) > 0 { n.hotSelector = selector.New(n.hotBucket, config.SelectionPolicy) } else { @@ -120,6 +131,7 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { // Select implements storage.Selector. func (n *nativeStorage) Select(ctx context.Context, id *object.ID) storage.Bucket { + // find bucket: Hot → Warm → Cold return n.chainSelector(ctx, id, n.hotSelector, n.warmSelector, @@ -145,7 +157,11 @@ func (n *nativeStorage) Rebuild(ctx context.Context, buckets []storage.Bucket) e // Buckets implements storage.Storage. func (n *nativeStorage) Buckets() []storage.Bucket { - return append(n.normalBucket, n.hotBucket...) + buckets := make([]storage.Bucket, 0, len(n.normalBucket)+len(n.hotBucket)+len(n.coldBucket)) + buckets = append(buckets, n.normalBucket...) + buckets = append(buckets, n.hotBucket...) + buckets = append(buckets, n.coldBucket...) + return buckets } // PURGE implements storage.Storage. From 02ef245b901624c3b08e6ba639f5998bad1d7adb Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Mon, 26 Jan 2026 20:00:57 +0800 Subject: [PATCH 3/4] feat(storage): implement warm/cold tiering with promotion and demotion logic --- api/defined/v1/storage/indexdb.go | 2 +- api/defined/v1/storage/storage.go | 19 +++ .../caching/caching_chunkpart_test.go | 2 +- server/middleware/caching/caching_vary.go | 17 +-- server/middleware/caching/internal.go | 14 ++- server/middleware/caching/processor.go | 13 +- storage/bucket/disk/disk.go | 117 ++++++++++++++++-- storage/bucket/empty/empty.go | 8 ++ storage/bucket/memory/memory.go | 12 +- storage/builder.go | 4 +- storage/marked/bucket.go | 12 ++ storage/marked/wrap_storage.go | 15 ++- storage/storage.go | 98 +++++++++++++-- 13 files changed, 278 insertions(+), 55 deletions(-) diff --git a/api/defined/v1/storage/indexdb.go b/api/defined/v1/storage/indexdb.go index 85f3047..8416521 100644 --- a/api/defined/v1/storage/indexdb.go +++ b/api/defined/v1/storage/indexdb.go @@ -14,7 +14,7 @@ var ErrKeyNotFound = errors.New("key not found") const ( TypeInMemory = "inmemory" TypeNormal = "normal" // normal, warm 同一个 - TypeWarm = "normal" // normal, warm 同一个 + TypeWarm = "warm" // normal, warm 同一个 TypeCold = "cold" TypeHot = "hot" ) diff --git a/api/defined/v1/storage/storage.go b/api/defined/v1/storage/storage.go index 9df159e..07efc0b 100644 --- a/api/defined/v1/storage/storage.go +++ b/api/defined/v1/storage/storage.go @@ -44,6 +44,8 @@ type Operation interface { WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error) // ReadChunkFile open chunk file and returns io.ReadCloser ReadChunkFile(ctx context.Context, id *object.ID, index uint32) (File, string, error) + // MoveTo moves the object to the target bucket. + MoveTo(ctx context.Context, id *object.ID, target Bucket) error } type Storage interface { @@ -55,6 +57,11 @@ type Storage interface { SharedKV() SharedKV PURGE(storeUrl string, typ PurgeControl) error + // Promote moves object up the tiers (e.g., warm -> hot) + Promote(ctx context.Context, id *object.ID, src Bucket) error + + // SelectWithType selects a Bucket by the tier type. + SelectWithType(ctx context.Context, id *object.ID, tierType string) Bucket } type Bucket interface { @@ -79,6 +86,18 @@ type Bucket interface { StoreType() string // Path returns the Bucket path. Path() string + // SetDemoter sets the demoter for the Bucket. + SetDemoter(demoter Demoter) + // SetPromoter sets the promoter for the Bucket. + SetPromoter(promoter Promoter) +} + +type Demoter interface { + Demote(ctx context.Context, id *object.ID, src Bucket) error +} + +type Promoter interface { + Promote(ctx context.Context, id *object.ID, src Bucket) error } type PurgeControl struct { diff --git a/server/middleware/caching/caching_chunkpart_test.go b/server/middleware/caching/caching_chunkpart_test.go index 8eb5973..c59e027 100644 --- a/server/middleware/caching/caching_chunkpart_test.go +++ b/server/middleware/caching/caching_chunkpart_test.go @@ -31,7 +31,7 @@ func Test_getContents(t *testing.T) { memoryBucket, _ := memory.New(&conf.Bucket{}, sharedkv.NewEmpty()) req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, "http://www.example.com/path/to/1.apk", nil) - objectID, _ := newObjectIDFromRequest(req, "", true) + objectID := newObjectIDFromRequest(req, "", true) c := &Caching{ log: log.NewHelper(log.GetLogger()), processor: mockProcessorChain(), diff --git a/server/middleware/caching/caching_vary.go b/server/middleware/caching/caching_vary.go index 0ffb0fa..2b83bf4 100644 --- a/server/middleware/caching/caching_vary.go +++ b/server/middleware/caching/caching_vary.go @@ -3,7 +3,6 @@ package caching import ( "context" "errors" - "fmt" "net/http" "os" "slices" @@ -126,10 +125,7 @@ func (v *VaryProcessor) lookup(caching *Caching, req *http.Request) *object.Meta varyKey := varycontrol.Clean(caching.md.Headers.Values("Vary")...) // Generate object ID based on Vary data from request headers. - vid, err := newObjectIDFromRequest(req, varyKey.VaryData(req.Header), caching.opt.IncludeQueryInCacheKey) - if err != nil { - return nil - } + vid := newObjectIDFromRequest(req, varyKey.VaryData(req.Header), caching.opt.IncludeQueryInCacheKey) vmd, err := caching.bucket.Lookup(req.Context(), vid) if err != nil { @@ -193,10 +189,7 @@ func (v *VaryProcessor) handleNoResponseVary(caching *Caching, resp *http.Respon caching.log.Debugf("vary data already exist: %s", varyData) } - l2MetaID, err := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) - if err != nil { - return nil, fmt.Errorf("failed to create hash-key: %w", err) - } + l2MetaID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) cl, _ := strconv.Atoi(resp.Header.Get("Content-Length")) return &object.Metadata{ @@ -222,7 +215,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response if metaVary.Compare(respVary) { // Vary keys match, try to find existing Vary cache. varyData = metaVary.VaryData(caching.req.Header) - varyKey, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) + varyKey := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) varyMeta, err := caching.bucket.Lookup(caching.req.Context(), varyKey) if err != nil { caching.log.Warnf("Vary key lookup failed: %v", err) @@ -251,7 +244,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response } // Build new Vary cache object. - varyObjectID, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) + varyObjectID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) return v.upgrade(caching, resp, varyObjectID, varyData) } @@ -264,7 +257,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response caching.md.VirtualKey = nil varyData = respVary.VaryData(caching.req.Header) - varyObjectID, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) + varyObjectID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) return v.upgrade(caching, resp, varyObjectID, varyData) } diff --git a/server/middleware/caching/internal.go b/server/middleware/caching/internal.go index 72eb64e..71bc105 100644 --- a/server/middleware/caching/internal.go +++ b/server/middleware/caching/internal.go @@ -310,17 +310,19 @@ func cloneRequest(req *http.Request) *http.Request { return proxyReq } -func newObjectIDFromRequest(req *http.Request, vd string, includeQuery bool) (*object.ID, error) { - // option: cache-key include querystring - // - // TODO: get cache-key from frontend protocol rule. +func newObjectIDFromRequest(req *http.Request, vd string, includeQuery bool) *object.ID { + // get cache-key from frontend protocol rule. + if cacheKey := req.Header.Get(constants.InternalStoreUrl); cacheKey != "" { + return object.NewVirtualID(cacheKey, vd) + } + // option: cache-key include querystring // or later default rule. if includeQuery { - return object.NewVirtualID(req.URL.String(), vd), nil + return object.NewVirtualID(req.URL.String(), vd) } - return object.NewVirtualID(fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path), vd), nil + return object.NewVirtualID(fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path), vd) } func closeBody(resp *http.Response) { diff --git a/server/middleware/caching/processor.go b/server/middleware/caching/processor.go index 78a40ce..58aacf1 100644 --- a/server/middleware/caching/processor.go +++ b/server/middleware/caching/processor.go @@ -1,7 +1,6 @@ package caching import ( - "fmt" "io" "net/http" "reflect" @@ -82,11 +81,7 @@ func (pc *ProcessorChain) PostRequest(caching *Caching, req *http.Request, resp } func (pc *ProcessorChain) preCacheProcessor(proxyClient proxy.Proxy, store storage.Storage, opt *cachingOption, req *http.Request) (*Caching, error) { - objectID, err := newObjectIDFromRequest(req, "", opt.IncludeQueryInCacheKey) - if err != nil { - return nil, fmt.Errorf("failed new object-objectID from request err: %w", err) - } - + objectID := newObjectIDFromRequest(req, "", opt.IncludeQueryInCacheKey) // Select storage bucket by object ID // hashring or diskhash bucket := store.Select(req.Context(), objectID) @@ -152,7 +147,11 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request } // incr index ref count. - caching.bucket.Touch(req.Context(), caching.id) + // trigger touch for promotion on cache hit + // do not block response path + if caching.hit && caching.bucket != nil && caching.id != nil { + go func() { _ = caching.bucket.Touch(req.Context(), caching.id) }() + } return resp, nil } diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index 30cbb9a..4fffef4 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -41,6 +41,8 @@ type diskBucket struct { cache *lru.Cache[object.IDHash, storage.Mark] fileFlag int fileMode fs.FileMode + demoter storage.Demoter + promoter storage.Promoter stop chan struct{} } @@ -93,15 +95,20 @@ func (d *diskBucket) evict() { clog.Debugf("start evict goroutine for %s", d.ID()) - // Demote func - demoteTarget := storage.TypeNormal - if d.storeType == storage.TypeNormal { - demoteTarget = storage.TypeCold - } - demote := func(evicted lru.Eviction[object.IDHash, storage.Mark]) error { - // TODO: demote to target storage bucket - log.Debugf("demote %s to %s", demoteTarget, evicted.Key) + migration := d.tiering != nil && d.tiering.Enabled + demote := func(evicted lru.Eviction[object.IDHash, storage.Mark]) error { + if d.demoter != nil { + md, err := d.indexdb.Get(context.Background(), evicted.Key[:]) + if err != nil { + return err + } + if md == nil || md.ID == nil { + return fmt.Errorf("metadata not found for demotion") + } + log.Debugf("demote %s to %s", d.storeType, md.ID.Key()) + return d.demoter.Demote(context.Background(), md.ID, d) + } return nil } @@ -118,7 +125,7 @@ func (d *diskBucket) evict() { return case evicted := <-ch: // expired cachefile Demote to cold bucket - if d.tiering != nil && d.tiering.Enabled { + if migration { if err := demote(evicted); err != nil { // fallback to discard discard(evicted) @@ -272,7 +279,7 @@ func (d *diskBucket) discard(ctx context.Context, md *object.Metadata) error { // Exist implements storage.Bucket. func (d *diskBucket) Exist(ctx context.Context, id []byte) bool { - return d.indexdb.Exist(ctx, id) + return d.cache.Has(object.IDHash(id)) } // Expired implements storage.Bucket. @@ -294,7 +301,7 @@ func (d *diskBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metadat return md, err } -// Touch implements [storage.Bucket]. +// Touch implements [storage.Bucket]. Update LRU mark and maybe trigger promotion. func (d *diskBucket) Touch(ctx context.Context, id *object.ID) error { mark := d.cache.Get(id.Hash()) if mark.LastAccess() <= 0 { @@ -304,6 +311,36 @@ func (d *diskBucket) Touch(ctx context.Context, id *object.ID) error { mark.SetLastAccess(time.Now().Unix()) mark.SetRefs(mark.Refs() + 1) d.cache.Set(id.Hash(), mark) + + // Promotion logic (hit counting within window) + if d.tiering == nil || !d.tiering.Enabled || d.promoter == nil { + return nil + } + if d.storeType == storage.TypeHot || d.storeType == storage.TypeInMemory { + return nil + } + + minHits := d.tiering.Promote.MinHits + window := d.tiering.Promote.Window + maxSize := d.tiering.Promote.MaxObjectSize + if minHits <= 0 || window <= 0 { + return nil + } + + hits := mark.Refs() + if int(hits) < minHits { + return nil + } + if maxSize > 0 { + if md, err := d.indexdb.Get(ctx, id.Bytes()); err == nil && md != nil { + if uint64(md.Size) > maxSize { + return nil + } + } + } + go func() { + _ = d.promoter.Promote(context.Background(), id, d) + }() return nil } @@ -422,6 +459,64 @@ func (d *diskBucket) Close() error { return d.indexdb.Close() } +func (d *diskBucket) MoveTo(ctx context.Context, id *object.ID, target storage.Bucket) error { + md, err := d.indexdb.Get(ctx, id.Bytes()) + if err != nil { + return err + } + + clog := log.Context(ctx) + + // 1. Move all chunks + var moveErr error + md.Chunks.Range(func(x uint32) { + if moveErr != nil { + return + } + + rf, _, err1 := d.ReadChunkFile(ctx, id, x) + if err1 != nil { + moveErr = err1 + return + } + defer rf.Close() + + wf, _, err2 := target.WriteChunkFile(ctx, id, x) + if err2 != nil { + moveErr = err2 + return + } + defer wf.Close() + + if _, err2 = io.Copy(wf, rf); err2 != nil { + moveErr = err2 + } + }) + + if moveErr != nil { + clog.Errorf("failed to move object %s chunks: %v", id.Key(), moveErr) + return moveErr + } + + // 2. Store metadata in target + if err := target.Store(ctx, md); err != nil { + clog.Errorf("failed to store metadata in target bucket for %s: %v", id.Key(), err) + return err + } + + // 3. Discard locally + return d.discard(ctx, md) +} + +func (d *diskBucket) SetDemoter(demoter storage.Demoter) { + d.demoter = demoter +} + +// SetPromoter implements Bucket.SetPromoter +func (d *diskBucket) SetPromoter(promoter storage.Promoter) { + d.promoter = promoter +} + func (d *diskBucket) initWorkdir() { defer func() { if rec := recover(); rec != nil { diff --git a/storage/bucket/empty/empty.go b/storage/bucket/empty/empty.go index 3f2293a..146130f 100644 --- a/storage/bucket/empty/empty.go +++ b/storage/bucket/empty/empty.go @@ -126,6 +126,14 @@ func (e *emptyBucket) ReadChunkFile(ctx context.Context, id *object.ID, index ui return nil, "discard", nil } +func (e *emptyBucket) MoveTo(ctx context.Context, id *object.ID, target storage.Bucket) error { + return nil +} + +func (e *emptyBucket) SetDemoter(demoter storage.Demoter) {} + +func (e *emptyBucket) SetPromoter(promoter storage.Promoter) {} + func New(c *conf.Bucket, _ storage.SharedKV) (storage.Bucket, error) { path := c.Path if path == "" { diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index 200af21..b7fbacd 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -40,6 +40,7 @@ type memoryBucket struct { maxSize uint64 closed bool stop chan struct{} + promoter storage.Promoter } func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) { @@ -48,7 +49,7 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) path: "/", dbPath: storage.TypeInMemory, driver: config.Driver, - storeType: storage.TypeInMemory, + storeType: config.Type, weight: 100, // default weight sharedkv: sharedkv, cache: lru.New[object.IDHash, storage.Mark](config.MaxObjectLimit), // in-memory object size @@ -286,6 +287,15 @@ func (m *memoryBucket) ReadChunkFile(_ context.Context, id *object.ID, index uin return storage.WrapVFSFile(f), wpath, err } +func (m *memoryBucket) MoveTo(ctx context.Context, id *object.ID, target storage.Bucket) error { + return nil +} + +func (m *memoryBucket) SetDemoter(demoter storage.Demoter) {} + +// SetPromoter implements Bucket.SetPromoter +func (m *memoryBucket) SetPromoter(promoter storage.Promoter) { m.promoter = promoter } + // StoreType implements [storage.Bucket]. func (m *memoryBucket) StoreType() string { return m.storeType diff --git a/storage/builder.go b/storage/builder.go index a248708..2b10192 100644 --- a/storage/builder.go +++ b/storage/builder.go @@ -20,7 +20,7 @@ type globalBucketOption struct { Driver string DBType string DBPath string - Tiering conf.Tiering + Tiering *conf.Tiering } // implements storage.Bucket map. @@ -48,7 +48,7 @@ func mergeConfig(global *globalBucketOption, bucket *conf.Bucket) *conf.Bucket { DBPath: bucket.DBPath, MaxObjectLimit: bucket.MaxObjectLimit, DBConfig: bucket.DBConfig, // custom db config - Tiering: &global.Tiering, + Tiering: global.Tiering, } if copied.Driver == "" { diff --git a/storage/marked/bucket.go b/storage/marked/bucket.go index 20d1d78..1d31c2a 100644 --- a/storage/marked/bucket.go +++ b/storage/marked/bucket.go @@ -130,3 +130,15 @@ func (b *wrappedBucket) Path() string { func (b *wrappedBucket) Close() error { return b.base.Close() } + +func (b *wrappedBucket) MoveTo(ctx context.Context, id *object.ID, target storagev1.Bucket) error { + return b.base.MoveTo(ctx, id, target) +} + +func (b *wrappedBucket) SetDemoter(demoter storagev1.Demoter) { + b.base.SetDemoter(demoter) +} + +func (b *wrappedBucket) SetPromoter(promoter storagev1.Promoter) { + b.base.SetPromoter(promoter) +} diff --git a/storage/marked/wrap_storage.go b/storage/marked/wrap_storage.go index f040b1c..c6145e0 100644 --- a/storage/marked/wrap_storage.go +++ b/storage/marked/wrap_storage.go @@ -40,12 +40,7 @@ func (w *wrappedStorage) Rebuild(ctx context.Context, buckets []storagev1.Bucket } func (w *wrappedStorage) Buckets() []storagev1.Bucket { - baseBuckets := w.base.Buckets() - wrapped := make([]storagev1.Bucket, 0, len(baseBuckets)) - for _, b := range baseBuckets { - wrapped = append(wrapped, wrapBucket(b, w.checker)) - } - return wrapped + return w.base.Buckets() } func (w *wrappedStorage) SharedKV() storagev1.SharedKV { @@ -66,6 +61,14 @@ func (w *wrappedStorage) PURGE(storeUrl string, typ storagev1.PurgeControl) erro return w.base.PURGE(storeUrl, typ) } +func (w *wrappedStorage) SelectWithType(ctx context.Context, id *object.ID, tier string) storagev1.Bucket { + return wrapBucket(w.base.SelectWithType(ctx, id, tier), w.checker) +} + +func (w *wrappedStorage) Promote(ctx context.Context, id *object.ID, src storagev1.Bucket) error { + return w.base.Promote(ctx, id, src) +} + func (w *wrappedStorage) Close() error { return w.base.Close() } diff --git a/storage/storage.go b/storage/storage.go index 6b3e916..4b94832 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -74,7 +74,7 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { Driver: config.Driver, DBType: config.DBType, DBPath: config.DBPath, - Tiering: *config.Tiering, + Tiering: config.Tiering, } for _, c := range config.Buckets { @@ -84,15 +84,16 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { } switch bucket.StoreType() { - case storage.TypeNormal: + case storage.TypeNormal, storage.TypeWarm: n.normalBucket = append(n.normalBucket, bucket) case storage.TypeHot: n.hotBucket = append(n.hotBucket, bucket) + case storage.TypeCold: + n.coldBucket = append(n.coldBucket, bucket) case storage.TypeInMemory: if n.memoryBucket != nil { return fmt.Errorf("only one inmemory bucket is allowed") } - n.memoryBucket = bucket } } @@ -126,6 +127,17 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { n.log.Infof("no hot bucket configured") } + // register demoter + for _, b := range n.Buckets() { + b.SetDemoter(n) + // register promoter as well + b.SetPromoter(n) + } + if n.memoryBucket != nil { + n.memoryBucket.SetDemoter(n) + n.memoryBucket.SetPromoter(n) + } + return nil } @@ -139,15 +151,81 @@ func (n *nativeStorage) Select(ctx context.Context, id *object.ID) storage.Bucke ) } +// SelectByTier implements storage.Storage. +func (n *nativeStorage) SelectWithType(ctx context.Context, id *object.ID, tier string) storage.Bucket { + switch tier { + case storage.TypeHot: + if n.hotSelector != nil { + return n.hotSelector.Select(ctx, id) + } + case storage.TypeNormal, storage.TypeWarm: // TypeWarm is same as TypeNormal + if n.warmSelector != nil { + return n.warmSelector.Select(ctx, id) + } + case storage.TypeCold: + if n.coldSelector != nil { + return n.coldSelector.Select(ctx, id) + } + case storage.TypeInMemory: + return n.memoryBucket + } + return nil +} + +// Demote implements storage.Demoter. +func (n *nativeStorage) Demote(ctx context.Context, id *object.ID, src storage.Bucket) error { + // Hot -> Warm -> Cold + var targetTier string + switch src.StoreType() { + case storage.TypeHot: + targetTier = storage.TypeNormal + case storage.TypeNormal: // TypeWarm is same as TypeNormal + targetTier = storage.TypeCold + default: + return nil // no demotion for other types + } + + target := n.SelectWithType(ctx, id, targetTier) + if target == nil { + return fmt.Errorf("no target bucket found for demotion from %s to %s", src.StoreType(), targetTier) + } + + return src.MoveTo(ctx, id, target) +} + +// Promote implements storage.Promoter. +func (n *nativeStorage) Promote(ctx context.Context, id *object.ID, src storage.Bucket) error { + // Cold -> Warm -> Hot + var targetTier string + switch src.StoreType() { + case storage.TypeCold: + targetTier = storage.TypeNormal + case storage.TypeNormal: // TypeWarm is same as TypeNormal + targetTier = storage.TypeHot + default: + return nil // no promotion for other types + } + + target := n.SelectWithType(ctx, id, targetTier) + if target == nil { + return fmt.Errorf("no target bucket found for promotion from %s to %s", src.StoreType(), targetTier) + } + + return src.MoveTo(ctx, id, target) +} + func (n *nativeStorage) chainSelector(ctx context.Context, id *object.ID, selectors ...storage.Selector) storage.Bucket { for _, sel := range selectors { - if sel != nil { - if bucket := sel.Select(ctx, id); bucket != nil { - return bucket - } + if sel == nil { + continue + } + if bucket := sel.Select(ctx, id); bucket != nil && bucket.Exist(ctx, id.Bytes()) { + return bucket } } - return nil + + // fallback to warm selector + return n.warmSelector.Select(ctx, id) } // Rebuild implements storage.Selector. @@ -272,6 +350,10 @@ func (n *nativeStorage) Close() error { errs = append(errs, bucket.Close()) } + for _, bucket := range n.coldBucket { + errs = append(errs, bucket.Close()) + } + if n.memoryBucket != nil { if err := n.memoryBucket.Close(); err != nil { errs = append(errs, err) From 82607e863b269da0a71577e4f11d956033c93de1 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Tue, 27 Jan 2026 21:37:43 +0800 Subject: [PATCH 4/4] feat: promote, demote in hot, warm, cold bucket --- api/defined/v1/storage/storage.go | 12 ++--- server/middleware/caching/caching.go | 2 +- server/middleware/caching/internal.go | 3 +- server/middleware/caching/locker.go | 7 +++ server/middleware/caching/processor.go | 64 ++++++++++++++++++++++---- storage/bucket/disk/disk.go | 26 ++++------- storage/bucket/empty/empty.go | 4 +- storage/bucket/memory/memory.go | 9 ++-- storage/marked/bucket.go | 8 +--- storage/storage.go | 14 ++---- 10 files changed, 89 insertions(+), 60 deletions(-) diff --git a/api/defined/v1/storage/storage.go b/api/defined/v1/storage/storage.go index 07efc0b..bffb357 100644 --- a/api/defined/v1/storage/storage.go +++ b/api/defined/v1/storage/storage.go @@ -57,6 +57,7 @@ type Storage interface { SharedKV() SharedKV PURGE(storeUrl string, typ PurgeControl) error + // Promote moves object up the tiers (e.g., warm -> hot) Promote(ctx context.Context, id *object.ID, src Bucket) error @@ -86,17 +87,12 @@ type Bucket interface { StoreType() string // Path returns the Bucket path. Path() string - // SetDemoter sets the demoter for the Bucket. - SetDemoter(demoter Demoter) - // SetPromoter sets the promoter for the Bucket. - SetPromoter(promoter Promoter) + // SetMigration sets the migration for the Bucket. + SetMigration(migration Migration) } -type Demoter interface { +type Migration interface { Demote(ctx context.Context, id *object.ID, src Bucket) error -} - -type Promoter interface { Promote(ctx context.Context, id *object.ID, src Bucket) error } diff --git a/server/middleware/caching/caching.go b/server/middleware/caching/caching.go index 09d88c7..c8ce510 100644 --- a/server/middleware/caching/caching.go +++ b/server/middleware/caching/caching.go @@ -266,7 +266,7 @@ func (c *Caching) getUpstreamReader(fromByte, toByte uint64, async bool) (io.Rea now := time.Now() c.log.Debugf("getUpstreamReader doProxy[chunk]: begin: %s, rawRange: %s, newRange: %s", now, rawRange, newRange) resp, err := c.doProxy(req, true) - c.log.Infof("getUpstreamReader doProxy[chunk]: timeCost: %s, rawRange: %s, newRange: %s", time.Since(now), rawRange, newRange) + c.log.Debugf("getUpstreamReader doProxy[chunk]: timeCost: %s, rawRange: %s, newRange: %s", time.Since(now), rawRange, newRange) if err != nil { closeBody(resp) return nil, err diff --git a/server/middleware/caching/internal.go b/server/middleware/caching/internal.go index 71bc105..630927b 100644 --- a/server/middleware/caching/internal.go +++ b/server/middleware/caching/internal.go @@ -91,7 +91,7 @@ func (c *Caching) setXCache(resp *http.Response) { return } - resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, "(tavern/4.0)"}, " ")) + resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, c.bucket.StoreType(), "(tavern/4.0)"}, " ")) metric := metrics.FromContext(c.req.Context()) metric.CacheStatus = c.cacheStatus.String() @@ -188,6 +188,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in c.log.Debugf("find available chunk index %d, availableChunks: %v", index, availableChunks) fromByte := uint64(reqChunks[from] * uint32(c.md.BlockSize)) if index < len(availableChunks) { + chunkFile, _ := getSliceChunkFile(c, availableChunks[index]) if chunkFile != nil { if err := checkChunkSize(c, chunkFile, idx); err != nil { diff --git a/server/middleware/caching/locker.go b/server/middleware/caching/locker.go index d775eda..93f3379 100644 --- a/server/middleware/caching/locker.go +++ b/server/middleware/caching/locker.go @@ -66,3 +66,10 @@ func (c *Caching) RUnlock() { } globalLocker.getLock(c.Key()).RUnlock() } + +func (c *Caching) TryLock() bool { + if c.id == nil { + return false + } + return globalLocker.getLock(c.Key()).TryLock() +} diff --git a/server/middleware/caching/processor.go b/server/middleware/caching/processor.go index 58aacf1..a6406d6 100644 --- a/server/middleware/caching/processor.go +++ b/server/middleware/caching/processor.go @@ -1,6 +1,7 @@ package caching import ( + "context" "io" "net/http" "reflect" @@ -10,6 +11,7 @@ import ( "time" "github.com/omalloc/tavern/api/defined/v1/storage" + "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" "github.com/omalloc/tavern/proxy" ) @@ -24,13 +26,22 @@ type Processor interface { PostRequest(caching *Caching, req *http.Request, resp *http.Response) (*http.Response, error) } +type touchArgs struct { + bucket storage.Bucket + id *object.ID + unlock func() +} + // ProcessorChain represents a chain of caching processors. -type ProcessorChain []Processor +type ProcessorChain struct { + processors []Processor + touchChan chan *touchArgs +} // Lookup iterates through the processor chain to check for a cache hit. func (pc *ProcessorChain) Lookup(caching *Caching, req *http.Request) (bool, error) { var err error - for _, processor := range *pc { + for _, processor := range pc.processors { caching.hit, err = processor.Lookup(caching, req) if err != nil { return false, err @@ -51,7 +62,7 @@ func (pc *ProcessorChain) Lookup(caching *Caching, req *http.Request) (bool, err // PreRequest processes the request through the processor chain before sending it to the origin server. func (pc *ProcessorChain) PreRequest(caching *Caching, req *http.Request) (*http.Request, error) { var err error - for _, processor := range *pc { + for _, processor := range pc.processors { req, err = processor.PreRequest(caching, req) if err != nil { if caching.log.Enabled(log.LevelDebug) { @@ -67,7 +78,7 @@ func (pc *ProcessorChain) PreRequest(caching *Caching, req *http.Request) (*http // PostRequest processes the response through the processor chain after receiving it from the origin server. func (pc *ProcessorChain) PostRequest(caching *Caching, req *http.Request, resp *http.Response) (*http.Response, error) { var err error - for _, processor := range *pc { + for _, processor := range pc.processors { resp, err = processor.PostRequest(caching, req, resp) if err != nil { if caching.log.Enabled(log.LevelDebug) { @@ -150,7 +161,22 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request // trigger touch for promotion on cache hit // do not block response path if caching.hit && caching.bucket != nil && caching.id != nil { - go func() { _ = caching.bucket.Touch(req.Context(), caching.id) }() + if !caching.TryLock() { + // already locked by other goroutine + caching.log.Infof("tryLock %s: already locked", caching.id.String()) + return resp, nil + } + + select { + case pc.touchChan <- &touchArgs{ + bucket: caching.bucket, + id: caching.id, + unlock: caching.Unlock, + }: // + default: + caching.Unlock() + caching.log.Warnf("failed to touch object %s: queue full", caching.id.String()) + } } return resp, nil @@ -159,7 +185,7 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request // String returns a string representation of the processor chain. func (pc *ProcessorChain) String() string { sb := strings.Builder{} - for i, processor := range *pc { + for i, processor := range pc.processors { if i > 0 { sb.WriteString(" -> ") } @@ -171,13 +197,33 @@ func (pc *ProcessorChain) String() string { // NewProcessorChain creates a new ProcessorChain with the given processors. func NewProcessorChain(processors ...Processor) *ProcessorChain { - pc := ProcessorChain(processors) - return &pc + pc := &ProcessorChain{ + processors: processors, + touchChan: make(chan *touchArgs, 10_000), // 1w + } + + go pc.startWorker() + + return pc +} + +// startWorker start a worker to touch objects. +func (pc *ProcessorChain) startWorker() { + go func() { + for args := range pc.touchChan { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + if err := args.bucket.Touch(ctx, args.id); err != nil { + log.Warnf("failed to touch object %s: %v", args.id.String(), err) + } + args.unlock() + cancel() + } + }() } // fill removes any nil processors from the chain. func (pc *ProcessorChain) fill() *ProcessorChain { - *pc = slices.DeleteFunc(*pc, func(p Processor) bool { + pc.processors = slices.DeleteFunc(pc.processors, func(p Processor) bool { return p == nil }) return pc diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index 4fffef4..ad9dd3b 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -41,8 +41,7 @@ type diskBucket struct { cache *lru.Cache[object.IDHash, storage.Mark] fileFlag int fileMode fs.FileMode - demoter storage.Demoter - promoter storage.Promoter + migration storage.Migration stop chan struct{} } @@ -98,7 +97,7 @@ func (d *diskBucket) evict() { migration := d.tiering != nil && d.tiering.Enabled demote := func(evicted lru.Eviction[object.IDHash, storage.Mark]) error { - if d.demoter != nil { + if d.migration != nil { md, err := d.indexdb.Get(context.Background(), evicted.Key[:]) if err != nil { return err @@ -107,7 +106,7 @@ func (d *diskBucket) evict() { return fmt.Errorf("metadata not found for demotion") } log.Debugf("demote %s to %s", d.storeType, md.ID.Key()) - return d.demoter.Demote(context.Background(), md.ID, d) + return d.migration.Demote(context.Background(), md.ID, d) } return nil } @@ -310,10 +309,11 @@ func (d *diskBucket) Touch(ctx context.Context, id *object.ID) error { mark.SetLastAccess(time.Now().Unix()) mark.SetRefs(mark.Refs() + 1) + d.cache.Set(id.Hash(), mark) // Promotion logic (hit counting within window) - if d.tiering == nil || !d.tiering.Enabled || d.promoter == nil { + if d.tiering == nil || !d.tiering.Enabled || d.migration == nil { return nil } if d.storeType == storage.TypeHot || d.storeType == storage.TypeInMemory { @@ -338,10 +338,9 @@ func (d *diskBucket) Touch(ctx context.Context, id *object.ID) error { } } } - go func() { - _ = d.promoter.Promote(context.Background(), id, d) - }() - return nil + + log.Infof("promote %s", id.WPath(d.path)) + return d.migration.Promote(context.Background(), id, d) } // Remove implements storage.Bucket. @@ -508,13 +507,8 @@ func (d *diskBucket) MoveTo(ctx context.Context, id *object.ID, target storage.B return d.discard(ctx, md) } -func (d *diskBucket) SetDemoter(demoter storage.Demoter) { - d.demoter = demoter -} - -// SetPromoter implements Bucket.SetPromoter -func (d *diskBucket) SetPromoter(promoter storage.Promoter) { - d.promoter = promoter +func (d *diskBucket) SetMigration(migration storage.Migration) { + d.migration = migration } func (d *diskBucket) initWorkdir() { diff --git a/storage/bucket/empty/empty.go b/storage/bucket/empty/empty.go index 146130f..121c0f1 100644 --- a/storage/bucket/empty/empty.go +++ b/storage/bucket/empty/empty.go @@ -130,9 +130,7 @@ func (e *emptyBucket) MoveTo(ctx context.Context, id *object.ID, target storage. return nil } -func (e *emptyBucket) SetDemoter(demoter storage.Demoter) {} - -func (e *emptyBucket) SetPromoter(promoter storage.Promoter) {} +func (e *emptyBucket) SetMigration(migration storage.Migration) {} func New(c *conf.Bucket, _ storage.SharedKV) (storage.Bucket, error) { path := c.Path diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index b7fbacd..57441a0 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -40,7 +40,7 @@ type memoryBucket struct { maxSize uint64 closed bool stop chan struct{} - promoter storage.Promoter + migration storage.Migration } func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) { @@ -267,7 +267,7 @@ func (m *memoryBucket) WriteChunkFile(ctx context.Context, id *object.ID, index _ = m.fs.MkdirAll(filepath.Dir(wpath), m.fileMode) if log.Enabled(log.LevelDebug) { - log.Context(ctx).Infof("write inmemory chunk file %s", wpath) + log.Context(ctx).Debugf("write inmemory chunk file %s", wpath) } f, err := m.fs.OpenReadWrite(wpath, vfs.WriteCategoryUnspecified) @@ -291,10 +291,7 @@ func (m *memoryBucket) MoveTo(ctx context.Context, id *object.ID, target storage return nil } -func (m *memoryBucket) SetDemoter(demoter storage.Demoter) {} - -// SetPromoter implements Bucket.SetPromoter -func (m *memoryBucket) SetPromoter(promoter storage.Promoter) { m.promoter = promoter } +func (m *memoryBucket) SetMigration(migration storage.Migration) {} // StoreType implements [storage.Bucket]. func (m *memoryBucket) StoreType() string { diff --git a/storage/marked/bucket.go b/storage/marked/bucket.go index 1d31c2a..5c9ca44 100644 --- a/storage/marked/bucket.go +++ b/storage/marked/bucket.go @@ -135,10 +135,6 @@ func (b *wrappedBucket) MoveTo(ctx context.Context, id *object.ID, target storag return b.base.MoveTo(ctx, id, target) } -func (b *wrappedBucket) SetDemoter(demoter storagev1.Demoter) { - b.base.SetDemoter(demoter) -} - -func (b *wrappedBucket) SetPromoter(promoter storagev1.Promoter) { - b.base.SetPromoter(promoter) +func (b *wrappedBucket) SetMigration(migration storagev1.Migration) { + b.base.SetMigration(migration) } diff --git a/storage/storage.go b/storage/storage.go index 4b94832..370420d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -127,15 +127,9 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { n.log.Infof("no hot bucket configured") } - // register demoter + // register demoter and promoter for _, b := range n.Buckets() { - b.SetDemoter(n) - // register promoter as well - b.SetPromoter(n) - } - if n.memoryBucket != nil { - n.memoryBucket.SetDemoter(n) - n.memoryBucket.SetPromoter(n) + b.SetMigration(n) } return nil @@ -199,8 +193,8 @@ func (n *nativeStorage) Promote(ctx context.Context, id *object.ID, src storage. var targetTier string switch src.StoreType() { case storage.TypeCold: - targetTier = storage.TypeNormal - case storage.TypeNormal: // TypeWarm is same as TypeNormal + targetTier = storage.TypeWarm + case storage.TypeWarm: targetTier = storage.TypeHot default: return nil // no promotion for other types