diff --git a/api/defined/v1/storage/indexdb.go b/api/defined/v1/storage/indexdb.go index 4f00827..8416521 100644 --- a/api/defined/v1/storage/indexdb.go +++ b/api/defined/v1/storage/indexdb.go @@ -13,7 +13,9 @@ var ErrKeyNotFound = errors.New("key not found") const ( TypeInMemory = "inmemory" - TypeNormal = "normal" + TypeNormal = "normal" // normal, warm 同一个 + TypeWarm = "warm" // normal, warm 同一个 + TypeCold = "cold" TypeHot = "hot" ) diff --git a/api/defined/v1/storage/storage.go b/api/defined/v1/storage/storage.go index 51b7ce2..bffb357 100644 --- a/api/defined/v1/storage/storage.go +++ b/api/defined/v1/storage/storage.go @@ -20,6 +20,8 @@ type Selector interface { type Operation interface { // Lookup retrieves the metadata for the specified object ID. Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error) + // Touch updates the last access time for the specified object ID. + Touch(ctx context.Context, id *object.ID) error // Store store the metadata for the specified object ID. Store(ctx context.Context, meta *object.Metadata) error // Exist checks if the object exists. @@ -42,6 +44,8 @@ type Operation interface { WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error) // ReadChunkFile open chunk file and returns io.ReadCloser ReadChunkFile(ctx context.Context, id *object.ID, index uint32) (File, string, error) + // MoveTo moves the object to the target bucket. + MoveTo(ctx context.Context, id *object.ID, target Bucket) error } type Storage interface { @@ -53,6 +57,12 @@ type Storage interface { SharedKV() SharedKV PURGE(storeUrl string, typ PurgeControl) error + + // Promote moves object up the tiers (e.g., warm -> hot) + Promote(ctx context.Context, id *object.ID, src Bucket) error + + // SelectWithType selects a Bucket by the tier type. + SelectWithType(ctx context.Context, id *object.ID, tierType string) Bucket } type Bucket interface { @@ -77,6 +87,13 @@ type Bucket interface { StoreType() string // Path returns the Bucket path. Path() string + // SetMigration sets the migration for the Bucket. + SetMigration(migration Migration) +} + +type Migration interface { + Demote(ctx context.Context, id *object.ID, src Bucket) error + Promote(ctx context.Context, id *object.ID, src Bucket) error } type PurgeControl struct { diff --git a/conf/conf.go b/conf/conf.go index bb9b8bf..17796d2 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -68,28 +68,50 @@ type Upstream struct { Features map[string]any `json:"features" yaml:"features"` } -type Storage struct { - Driver string `json:"driver" yaml:"driver"` - DBType string `json:"db_type" yaml:"db_type"` - DBPath string `json:"db_path" yaml:"db_path"` // default db path - AsyncLoad bool `json:"async_load" yaml:"async_load"` - EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"` - SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"` - SliceSize uint64 `json:"slice_size" yaml:"slice_size"` - Buckets []*Bucket `json:"buckets" yaml:"buckets"` -} +type ( + Storage struct { + Driver string `json:"driver" yaml:"driver"` + DBType string `json:"db_type" yaml:"db_type"` + DBPath string `json:"db_path" yaml:"db_path"` // default db path + AsyncLoad bool `json:"async_load" yaml:"async_load"` + EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"` + SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"` + SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part + Tiering *Tiering `json:"tiering" yaml:"tiering"` // tiering config + Buckets []*Bucket `json:"buckets" yaml:"buckets"` // bucket list + } -type Bucket struct { - Path string `json:"path" yaml:"path"` // local path or ? - Driver string `json:"driver" yaml:"driver"` // native, custom-driver - Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory - DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble - DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: /.indexdb - AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async - SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part - MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard - DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config -} + Bucket struct { + Path string `json:"path" yaml:"path"` // local path or ? + Driver string `json:"driver" yaml:"driver"` // native, custom-driver + Type string `json:"type" yaml:"type"` // warm(normal), cold, hot, inmemory + DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble + DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: /.indexdb + AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async + SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part + MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard + DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config + Tiering *Tiering `json:"-" yaml:"-"` // extend global tiering config + } +) + +type ( + Promote struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m + MaxObjectSize uint64 `json:"max_object_size" yaml:"max_object_size"` // 最大对象大小 + } + Demote struct { + MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N + Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m + Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N% + } + Tiering struct { + Enabled bool `json:"enabled" yaml:"enabled"` + Promote Promote `json:"promote" yaml:"promote"` // 升温 + Demote Demote `json:"demote" yaml:"demote"` // 降温 + } +) type Plugin struct { Name string `json:"name" yaml:"name"` diff --git a/config.example.yaml b/config.example.yaml index 6abeaf7..396314a 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -93,6 +93,16 @@ storage: eviction_policy: fifo # fifo, lru, lfu selection_policy: hashring # hashring, roundrobin slice_size: 1048576 # 1MB + tiering: + enabled: false # enable tiering bucket + promote: + min_hits: 10 # window hits to promote + window: 1m # 1 minute window + max_object_size: 20971520 # 20MB + demote: + min_hits: 2 # window hits to demote + window: 5m # 5 minutes window + occupancy: 75 # percent useage buckets: - path: /cache1 type: normal diff --git a/server/middleware/caching/caching.go b/server/middleware/caching/caching.go index 09d88c7..c8ce510 100644 --- a/server/middleware/caching/caching.go +++ b/server/middleware/caching/caching.go @@ -266,7 +266,7 @@ func (c *Caching) getUpstreamReader(fromByte, toByte uint64, async bool) (io.Rea now := time.Now() c.log.Debugf("getUpstreamReader doProxy[chunk]: begin: %s, rawRange: %s, newRange: %s", now, rawRange, newRange) resp, err := c.doProxy(req, true) - c.log.Infof("getUpstreamReader doProxy[chunk]: timeCost: %s, rawRange: %s, newRange: %s", time.Since(now), rawRange, newRange) + c.log.Debugf("getUpstreamReader doProxy[chunk]: timeCost: %s, rawRange: %s, newRange: %s", time.Since(now), rawRange, newRange) if err != nil { closeBody(resp) return nil, err diff --git a/server/middleware/caching/caching_chunkpart_test.go b/server/middleware/caching/caching_chunkpart_test.go index 8eb5973..c59e027 100644 --- a/server/middleware/caching/caching_chunkpart_test.go +++ b/server/middleware/caching/caching_chunkpart_test.go @@ -31,7 +31,7 @@ func Test_getContents(t *testing.T) { memoryBucket, _ := memory.New(&conf.Bucket{}, sharedkv.NewEmpty()) req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, "http://www.example.com/path/to/1.apk", nil) - objectID, _ := newObjectIDFromRequest(req, "", true) + objectID := newObjectIDFromRequest(req, "", true) c := &Caching{ log: log.NewHelper(log.GetLogger()), processor: mockProcessorChain(), diff --git a/server/middleware/caching/caching_vary.go b/server/middleware/caching/caching_vary.go index 0ffb0fa..2b83bf4 100644 --- a/server/middleware/caching/caching_vary.go +++ b/server/middleware/caching/caching_vary.go @@ -3,7 +3,6 @@ package caching import ( "context" "errors" - "fmt" "net/http" "os" "slices" @@ -126,10 +125,7 @@ func (v *VaryProcessor) lookup(caching *Caching, req *http.Request) *object.Meta varyKey := varycontrol.Clean(caching.md.Headers.Values("Vary")...) // Generate object ID based on Vary data from request headers. - vid, err := newObjectIDFromRequest(req, varyKey.VaryData(req.Header), caching.opt.IncludeQueryInCacheKey) - if err != nil { - return nil - } + vid := newObjectIDFromRequest(req, varyKey.VaryData(req.Header), caching.opt.IncludeQueryInCacheKey) vmd, err := caching.bucket.Lookup(req.Context(), vid) if err != nil { @@ -193,10 +189,7 @@ func (v *VaryProcessor) handleNoResponseVary(caching *Caching, resp *http.Respon caching.log.Debugf("vary data already exist: %s", varyData) } - l2MetaID, err := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) - if err != nil { - return nil, fmt.Errorf("failed to create hash-key: %w", err) - } + l2MetaID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) cl, _ := strconv.Atoi(resp.Header.Get("Content-Length")) return &object.Metadata{ @@ -222,7 +215,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response if metaVary.Compare(respVary) { // Vary keys match, try to find existing Vary cache. varyData = metaVary.VaryData(caching.req.Header) - varyKey, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) + varyKey := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) varyMeta, err := caching.bucket.Lookup(caching.req.Context(), varyKey) if err != nil { caching.log.Warnf("Vary key lookup failed: %v", err) @@ -251,7 +244,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response } // Build new Vary cache object. - varyObjectID, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) + varyObjectID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) return v.upgrade(caching, resp, varyObjectID, varyData) } @@ -264,7 +257,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response caching.md.VirtualKey = nil varyData = respVary.VaryData(caching.req.Header) - varyObjectID, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) + varyObjectID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey) return v.upgrade(caching, resp, varyObjectID, varyData) } diff --git a/server/middleware/caching/internal.go b/server/middleware/caching/internal.go index 72eb64e..630927b 100644 --- a/server/middleware/caching/internal.go +++ b/server/middleware/caching/internal.go @@ -91,7 +91,7 @@ func (c *Caching) setXCache(resp *http.Response) { return } - resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, "(tavern/4.0)"}, " ")) + resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, c.bucket.StoreType(), "(tavern/4.0)"}, " ")) metric := metrics.FromContext(c.req.Context()) metric.CacheStatus = c.cacheStatus.String() @@ -188,6 +188,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in c.log.Debugf("find available chunk index %d, availableChunks: %v", index, availableChunks) fromByte := uint64(reqChunks[from] * uint32(c.md.BlockSize)) if index < len(availableChunks) { + chunkFile, _ := getSliceChunkFile(c, availableChunks[index]) if chunkFile != nil { if err := checkChunkSize(c, chunkFile, idx); err != nil { @@ -310,17 +311,19 @@ func cloneRequest(req *http.Request) *http.Request { return proxyReq } -func newObjectIDFromRequest(req *http.Request, vd string, includeQuery bool) (*object.ID, error) { - // option: cache-key include querystring - // - // TODO: get cache-key from frontend protocol rule. +func newObjectIDFromRequest(req *http.Request, vd string, includeQuery bool) *object.ID { + // get cache-key from frontend protocol rule. + if cacheKey := req.Header.Get(constants.InternalStoreUrl); cacheKey != "" { + return object.NewVirtualID(cacheKey, vd) + } + // option: cache-key include querystring // or later default rule. if includeQuery { - return object.NewVirtualID(req.URL.String(), vd), nil + return object.NewVirtualID(req.URL.String(), vd) } - return object.NewVirtualID(fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path), vd), nil + return object.NewVirtualID(fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path), vd) } func closeBody(resp *http.Response) { diff --git a/server/middleware/caching/locker.go b/server/middleware/caching/locker.go index d775eda..93f3379 100644 --- a/server/middleware/caching/locker.go +++ b/server/middleware/caching/locker.go @@ -66,3 +66,10 @@ func (c *Caching) RUnlock() { } globalLocker.getLock(c.Key()).RUnlock() } + +func (c *Caching) TryLock() bool { + if c.id == nil { + return false + } + return globalLocker.getLock(c.Key()).TryLock() +} diff --git a/server/middleware/caching/processor.go b/server/middleware/caching/processor.go index fefab56..a6406d6 100644 --- a/server/middleware/caching/processor.go +++ b/server/middleware/caching/processor.go @@ -1,7 +1,7 @@ package caching import ( - "fmt" + "context" "io" "net/http" "reflect" @@ -11,6 +11,7 @@ import ( "time" "github.com/omalloc/tavern/api/defined/v1/storage" + "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" "github.com/omalloc/tavern/proxy" ) @@ -25,13 +26,22 @@ type Processor interface { PostRequest(caching *Caching, req *http.Request, resp *http.Response) (*http.Response, error) } +type touchArgs struct { + bucket storage.Bucket + id *object.ID + unlock func() +} + // ProcessorChain represents a chain of caching processors. -type ProcessorChain []Processor +type ProcessorChain struct { + processors []Processor + touchChan chan *touchArgs +} // Lookup iterates through the processor chain to check for a cache hit. func (pc *ProcessorChain) Lookup(caching *Caching, req *http.Request) (bool, error) { var err error - for _, processor := range *pc { + for _, processor := range pc.processors { caching.hit, err = processor.Lookup(caching, req) if err != nil { return false, err @@ -52,7 +62,7 @@ func (pc *ProcessorChain) Lookup(caching *Caching, req *http.Request) (bool, err // PreRequest processes the request through the processor chain before sending it to the origin server. func (pc *ProcessorChain) PreRequest(caching *Caching, req *http.Request) (*http.Request, error) { var err error - for _, processor := range *pc { + for _, processor := range pc.processors { req, err = processor.PreRequest(caching, req) if err != nil { if caching.log.Enabled(log.LevelDebug) { @@ -68,7 +78,7 @@ func (pc *ProcessorChain) PreRequest(caching *Caching, req *http.Request) (*http // PostRequest processes the response through the processor chain after receiving it from the origin server. func (pc *ProcessorChain) PostRequest(caching *Caching, req *http.Request, resp *http.Response) (*http.Response, error) { var err error - for _, processor := range *pc { + for _, processor := range pc.processors { resp, err = processor.PostRequest(caching, req, resp) if err != nil { if caching.log.Enabled(log.LevelDebug) { @@ -82,11 +92,7 @@ func (pc *ProcessorChain) PostRequest(caching *Caching, req *http.Request, resp } func (pc *ProcessorChain) preCacheProcessor(proxyClient proxy.Proxy, store storage.Storage, opt *cachingOption, req *http.Request) (*Caching, error) { - objectID, err := newObjectIDFromRequest(req, "", opt.IncludeQueryInCacheKey) - if err != nil { - return nil, fmt.Errorf("failed new object-objectID from request err: %w", err) - } - + objectID := newObjectIDFromRequest(req, "", opt.IncludeQueryInCacheKey) // Select storage bucket by object ID // hashring or diskhash bucket := store.Select(req.Context(), objectID) @@ -151,7 +157,27 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request _, _ = io.Copy(io.Discard, resp.Body) } - // TODO: incr index ref count. + // incr index ref count. + // trigger touch for promotion on cache hit + // do not block response path + if caching.hit && caching.bucket != nil && caching.id != nil { + if !caching.TryLock() { + // already locked by other goroutine + caching.log.Infof("tryLock %s: already locked", caching.id.String()) + return resp, nil + } + + select { + case pc.touchChan <- &touchArgs{ + bucket: caching.bucket, + id: caching.id, + unlock: caching.Unlock, + }: // + default: + caching.Unlock() + caching.log.Warnf("failed to touch object %s: queue full", caching.id.String()) + } + } return resp, nil } @@ -159,7 +185,7 @@ func (pc *ProcessorChain) postCacheProcessor(caching *Caching, req *http.Request // String returns a string representation of the processor chain. func (pc *ProcessorChain) String() string { sb := strings.Builder{} - for i, processor := range *pc { + for i, processor := range pc.processors { if i > 0 { sb.WriteString(" -> ") } @@ -171,13 +197,33 @@ func (pc *ProcessorChain) String() string { // NewProcessorChain creates a new ProcessorChain with the given processors. func NewProcessorChain(processors ...Processor) *ProcessorChain { - pc := ProcessorChain(processors) - return &pc + pc := &ProcessorChain{ + processors: processors, + touchChan: make(chan *touchArgs, 10_000), // 1w + } + + go pc.startWorker() + + return pc +} + +// startWorker start a worker to touch objects. +func (pc *ProcessorChain) startWorker() { + go func() { + for args := range pc.touchChan { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + if err := args.bucket.Touch(ctx, args.id); err != nil { + log.Warnf("failed to touch object %s: %v", args.id.String(), err) + } + args.unlock() + cancel() + } + }() } // fill removes any nil processors from the chain. func (pc *ProcessorChain) fill() *ProcessorChain { - *pc = slices.DeleteFunc(*pc, func(p Processor) bool { + pc.processors = slices.DeleteFunc(pc.processors, func(p Processor) bool { return p == nil }) return pc diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index c4cd42b..ad9dd3b 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -34,12 +34,14 @@ type diskBucket struct { driver string storeType string asyncLoad bool + tiering *conf.Tiering weight int sharedkv storage.SharedKV indexdb storage.IndexDB cache *lru.Cache[object.IDHash, storage.Mark] fileFlag int fileMode fs.FileMode + migration storage.Migration stop chan struct{} } @@ -50,6 +52,7 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) driver: config.Driver, storeType: config.Type, asyncLoad: config.AsyncLoad, + tiering: config.Tiering, weight: 100, // default weight sharedkv: sharedkv, cache: lru.New[object.IDHash, storage.Mark](config.MaxObjectLimit), @@ -91,16 +94,45 @@ func (d *diskBucket) evict() { clog.Debugf("start evict goroutine for %s", d.ID()) + migration := d.tiering != nil && d.tiering.Enabled + + demote := func(evicted lru.Eviction[object.IDHash, storage.Mark]) error { + if d.migration != nil { + md, err := d.indexdb.Get(context.Background(), evicted.Key[:]) + if err != nil { + return err + } + if md == nil || md.ID == nil { + return fmt.Errorf("metadata not found for demotion") + } + log.Debugf("demote %s to %s", d.storeType, md.ID.Key()) + return d.migration.Demote(context.Background(), md.ID, d) + } + return nil + } + + discard := func(evicted lru.Eviction[object.IDHash, storage.Mark]) { + fd := evicted.Key.WPath(d.path) + clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) + d.DiscardWithHash(context.Background(), evicted.Key) + } + go func() { for { select { case <-d.stop: return case evicted := <-ch: - fd := evicted.Key.WPath(d.path) - clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) - // TODO: discard expired cachefile or Move to cold storage - d.DiscardWithHash(context.Background(), evicted.Key) + // expired cachefile Demote to cold bucket + if migration { + if err := demote(evicted); err != nil { + // fallback to discard + discard(evicted) + } + continue + } + + discard(evicted) } } }() @@ -246,7 +278,7 @@ func (d *diskBucket) discard(ctx context.Context, md *object.Metadata) error { // Exist implements storage.Bucket. func (d *diskBucket) Exist(ctx context.Context, id []byte) bool { - return d.indexdb.Exist(ctx, id) + return d.cache.Has(object.IDHash(id)) } // Expired implements storage.Bucket. @@ -268,6 +300,49 @@ func (d *diskBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metadat return md, err } +// Touch implements [storage.Bucket]. Update LRU mark and maybe trigger promotion. +func (d *diskBucket) Touch(ctx context.Context, id *object.ID) error { + mark := d.cache.Get(id.Hash()) + if mark.LastAccess() <= 0 { + return nil + } + + mark.SetLastAccess(time.Now().Unix()) + mark.SetRefs(mark.Refs() + 1) + + d.cache.Set(id.Hash(), mark) + + // Promotion logic (hit counting within window) + if d.tiering == nil || !d.tiering.Enabled || d.migration == nil { + return nil + } + if d.storeType == storage.TypeHot || d.storeType == storage.TypeInMemory { + return nil + } + + minHits := d.tiering.Promote.MinHits + window := d.tiering.Promote.Window + maxSize := d.tiering.Promote.MaxObjectSize + if minHits <= 0 || window <= 0 { + return nil + } + + hits := mark.Refs() + if int(hits) < minHits { + return nil + } + if maxSize > 0 { + if md, err := d.indexdb.Get(ctx, id.Bytes()); err == nil && md != nil { + if uint64(md.Size) > maxSize { + return nil + } + } + } + + log.Infof("promote %s", id.WPath(d.path)) + return d.migration.Promote(context.Background(), id, d) +} + // Remove implements storage.Bucket. func (d *diskBucket) Remove(ctx context.Context, id *object.ID) error { return d.indexdb.Delete(ctx, id.Bytes()) @@ -383,6 +458,59 @@ func (d *diskBucket) Close() error { return d.indexdb.Close() } +func (d *diskBucket) MoveTo(ctx context.Context, id *object.ID, target storage.Bucket) error { + md, err := d.indexdb.Get(ctx, id.Bytes()) + if err != nil { + return err + } + + clog := log.Context(ctx) + + // 1. Move all chunks + var moveErr error + md.Chunks.Range(func(x uint32) { + if moveErr != nil { + return + } + + rf, _, err1 := d.ReadChunkFile(ctx, id, x) + if err1 != nil { + moveErr = err1 + return + } + defer rf.Close() + + wf, _, err2 := target.WriteChunkFile(ctx, id, x) + if err2 != nil { + moveErr = err2 + return + } + defer wf.Close() + + if _, err2 = io.Copy(wf, rf); err2 != nil { + moveErr = err2 + } + }) + + if moveErr != nil { + clog.Errorf("failed to move object %s chunks: %v", id.Key(), moveErr) + return moveErr + } + + // 2. Store metadata in target + if err := target.Store(ctx, md); err != nil { + clog.Errorf("failed to store metadata in target bucket for %s: %v", id.Key(), err) + return err + } + + // 3. Discard locally + return d.discard(ctx, md) +} + +func (d *diskBucket) SetMigration(migration storage.Migration) { + d.migration = migration +} + func (d *diskBucket) initWorkdir() { defer func() { if rec := recover(); rec != nil { diff --git a/storage/bucket/empty/empty.go b/storage/bucket/empty/empty.go index 1f8030b..121c0f1 100644 --- a/storage/bucket/empty/empty.go +++ b/storage/bucket/empty/empty.go @@ -80,6 +80,10 @@ func (e *emptyBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metada return nil, storage.ErrKeyNotFound } +func (e *emptyBucket) Touch(ctx context.Context, id *object.ID) error { + return nil +} + // Remove implements storage.Bucket. func (e *emptyBucket) Remove(ctx context.Context, id *object.ID) error { return nil @@ -122,6 +126,12 @@ func (e *emptyBucket) ReadChunkFile(ctx context.Context, id *object.ID, index ui return nil, "discard", nil } +func (e *emptyBucket) MoveTo(ctx context.Context, id *object.ID, target storage.Bucket) error { + return nil +} + +func (e *emptyBucket) SetMigration(migration storage.Migration) {} + func New(c *conf.Bucket, _ storage.SharedKV) (storage.Bucket, error) { path := c.Path if path == "" { diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index ba9ed43..57441a0 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -40,6 +40,7 @@ type memoryBucket struct { maxSize uint64 closed bool stop chan struct{} + migration storage.Migration } func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) { @@ -48,7 +49,7 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error) path: "/", dbPath: storage.TypeInMemory, driver: config.Driver, - storeType: storage.TypeInMemory, + storeType: config.Type, weight: 100, // default weight sharedkv: sharedkv, cache: lru.New[object.IDHash, storage.Mark](config.MaxObjectLimit), // in-memory object size @@ -202,6 +203,19 @@ func (m *memoryBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metad return md, err } +// Touch implements [storage.Bucket]. +func (m *memoryBucket) Touch(ctx context.Context, id *object.ID) error { + mark := m.cache.Get(id.Hash()) + if mark.LastAccess() <= 0 { + return nil + } + + mark.SetLastAccess(time.Now().Unix()) + mark.SetRefs(mark.Refs() + 1) + m.cache.Set(id.Hash(), mark) + return nil +} + // Path implements [storage.Bucket]. func (m *memoryBucket) Path() string { return m.path @@ -253,7 +267,7 @@ func (m *memoryBucket) WriteChunkFile(ctx context.Context, id *object.ID, index _ = m.fs.MkdirAll(filepath.Dir(wpath), m.fileMode) if log.Enabled(log.LevelDebug) { - log.Context(ctx).Infof("write inmemory chunk file %s", wpath) + log.Context(ctx).Debugf("write inmemory chunk file %s", wpath) } f, err := m.fs.OpenReadWrite(wpath, vfs.WriteCategoryUnspecified) @@ -273,6 +287,12 @@ func (m *memoryBucket) ReadChunkFile(_ context.Context, id *object.ID, index uin return storage.WrapVFSFile(f), wpath, err } +func (m *memoryBucket) MoveTo(ctx context.Context, id *object.ID, target storage.Bucket) error { + return nil +} + +func (m *memoryBucket) SetMigration(migration storage.Migration) {} + // StoreType implements [storage.Bucket]. func (m *memoryBucket) StoreType() string { return m.storeType diff --git a/storage/builder.go b/storage/builder.go index e8214ce..2b10192 100644 --- a/storage/builder.go +++ b/storage/builder.go @@ -20,6 +20,7 @@ type globalBucketOption struct { Driver string DBType string DBPath string + Tiering *conf.Tiering } // implements storage.Bucket map. @@ -47,6 +48,7 @@ func mergeConfig(global *globalBucketOption, bucket *conf.Bucket) *conf.Bucket { DBPath: bucket.DBPath, MaxObjectLimit: bucket.MaxObjectLimit, DBConfig: bucket.DBConfig, // custom db config + Tiering: global.Tiering, } if copied.Driver == "" { diff --git a/storage/marked/bucket.go b/storage/marked/bucket.go index 1c87dd7..5c9ca44 100644 --- a/storage/marked/bucket.go +++ b/storage/marked/bucket.go @@ -43,6 +43,10 @@ func (b *wrappedBucket) Lookup(ctx context.Context, id *object.ID) (*object.Meta return md, nil } +func (b *wrappedBucket) Touch(ctx context.Context, id *object.ID) error { + return b.base.Touch(ctx, id) +} + func (b *wrappedBucket) Store(ctx context.Context, meta *object.Metadata) error { return b.base.Store(ctx, meta) } @@ -126,3 +130,11 @@ func (b *wrappedBucket) Path() string { func (b *wrappedBucket) Close() error { return b.base.Close() } + +func (b *wrappedBucket) MoveTo(ctx context.Context, id *object.ID, target storagev1.Bucket) error { + return b.base.MoveTo(ctx, id, target) +} + +func (b *wrappedBucket) SetMigration(migration storagev1.Migration) { + b.base.SetMigration(migration) +} diff --git a/storage/marked/wrap_storage.go b/storage/marked/wrap_storage.go index f040b1c..c6145e0 100644 --- a/storage/marked/wrap_storage.go +++ b/storage/marked/wrap_storage.go @@ -40,12 +40,7 @@ func (w *wrappedStorage) Rebuild(ctx context.Context, buckets []storagev1.Bucket } func (w *wrappedStorage) Buckets() []storagev1.Bucket { - baseBuckets := w.base.Buckets() - wrapped := make([]storagev1.Bucket, 0, len(baseBuckets)) - for _, b := range baseBuckets { - wrapped = append(wrapped, wrapBucket(b, w.checker)) - } - return wrapped + return w.base.Buckets() } func (w *wrappedStorage) SharedKV() storagev1.SharedKV { @@ -66,6 +61,14 @@ func (w *wrappedStorage) PURGE(storeUrl string, typ storagev1.PurgeControl) erro return w.base.PURGE(storeUrl, typ) } +func (w *wrappedStorage) SelectWithType(ctx context.Context, id *object.ID, tier string) storagev1.Bucket { + return wrapBucket(w.base.SelectWithType(ctx, id, tier), w.checker) +} + +func (w *wrappedStorage) Promote(ctx context.Context, id *object.ID, src storagev1.Bucket) error { + return w.base.Promote(ctx, id, src) +} + func (w *wrappedStorage) Close() error { return w.base.Close() } diff --git a/storage/storage.go b/storage/storage.go index 10751ec..370420d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -24,12 +24,15 @@ type nativeStorage struct { mu sync.Mutex log *log.Helper - selector storage.Selector + warmSelector storage.Selector // warm selector + hotSelector storage.Selector // hot selector + coldSelector storage.Selector // cold selector sharedkv storage.SharedKV nopBucket storage.Bucket memoryBucket storage.Bucket hotBucket []storage.Bucket normalBucket []storage.Bucket + coldBucket []storage.Bucket } func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { @@ -39,7 +42,9 @@ func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { mu: sync.Mutex{}, log: log.NewHelper(logger), - selector: selector.New([]storage.Bucket{}, config.SelectionPolicy), + warmSelector: nil, + hotSelector: nil, + coldSelector: nil, sharedkv: sharedkv.NewMemSharedKV(), nopBucket: nopBucket, memoryBucket: nil, @@ -69,6 +74,7 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { Driver: config.Driver, DBType: config.DBType, DBPath: config.DBPath, + Tiering: config.Tiering, } for _, c := range config.Buckets { @@ -78,15 +84,16 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { } switch bucket.StoreType() { - case storage.TypeNormal: + case storage.TypeNormal, storage.TypeWarm: n.normalBucket = append(n.normalBucket, bucket) case storage.TypeHot: n.hotBucket = append(n.hotBucket, bucket) + case storage.TypeCold: + n.coldBucket = append(n.coldBucket, bucket) case storage.TypeInMemory: if n.memoryBucket != nil { return fmt.Errorf("only one inmemory bucket is allowed") } - n.memoryBucket = bucket } } @@ -96,22 +103,123 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { // load lru // load purge queue + // warm / normal if len(n.normalBucket) <= 0 { + n.log.Infof("no warm bucket configured") // no normal bucket, use nop bucket if n.memoryBucket != nil { n.normalBucket = append(n.normalBucket, n.memoryBucket) } } + n.warmSelector = selector.New(n.normalBucket, config.SelectionPolicy) - n.selector = selector.New(n.normalBucket, config.SelectionPolicy) + // cold + if len(n.coldBucket) > 0 { + n.coldSelector = selector.New(n.coldBucket, config.SelectionPolicy) + } else { + n.log.Infof("no cold bucket configured") + } + + // hot + if len(n.hotBucket) > 0 { + n.hotSelector = selector.New(n.hotBucket, config.SelectionPolicy) + } else { + n.log.Infof("no hot bucket configured") + } + + // register demoter and promoter + for _, b := range n.Buckets() { + b.SetMigration(n) + } return nil } // Select implements storage.Selector. func (n *nativeStorage) Select(ctx context.Context, id *object.ID) storage.Bucket { - bucket := n.selector.Select(ctx, id) - return bucket + // find bucket: Hot → Warm → Cold + return n.chainSelector(ctx, id, + n.hotSelector, + n.warmSelector, + n.coldSelector, + ) +} + +// SelectByTier implements storage.Storage. +func (n *nativeStorage) SelectWithType(ctx context.Context, id *object.ID, tier string) storage.Bucket { + switch tier { + case storage.TypeHot: + if n.hotSelector != nil { + return n.hotSelector.Select(ctx, id) + } + case storage.TypeNormal, storage.TypeWarm: // TypeWarm is same as TypeNormal + if n.warmSelector != nil { + return n.warmSelector.Select(ctx, id) + } + case storage.TypeCold: + if n.coldSelector != nil { + return n.coldSelector.Select(ctx, id) + } + case storage.TypeInMemory: + return n.memoryBucket + } + return nil +} + +// Demote implements storage.Demoter. +func (n *nativeStorage) Demote(ctx context.Context, id *object.ID, src storage.Bucket) error { + // Hot -> Warm -> Cold + var targetTier string + switch src.StoreType() { + case storage.TypeHot: + targetTier = storage.TypeNormal + case storage.TypeNormal: // TypeWarm is same as TypeNormal + targetTier = storage.TypeCold + default: + return nil // no demotion for other types + } + + target := n.SelectWithType(ctx, id, targetTier) + if target == nil { + return fmt.Errorf("no target bucket found for demotion from %s to %s", src.StoreType(), targetTier) + } + + return src.MoveTo(ctx, id, target) +} + +// Promote implements storage.Promoter. +func (n *nativeStorage) Promote(ctx context.Context, id *object.ID, src storage.Bucket) error { + // Cold -> Warm -> Hot + var targetTier string + switch src.StoreType() { + case storage.TypeCold: + targetTier = storage.TypeWarm + case storage.TypeWarm: + targetTier = storage.TypeHot + default: + return nil // no promotion for other types + } + + target := n.SelectWithType(ctx, id, targetTier) + if target == nil { + return fmt.Errorf("no target bucket found for promotion from %s to %s", src.StoreType(), targetTier) + } + + return src.MoveTo(ctx, id, target) +} + +func (n *nativeStorage) chainSelector(ctx context.Context, id *object.ID, selectors ...storage.Selector) storage.Bucket { + for _, sel := range selectors { + if sel == nil { + continue + } + if bucket := sel.Select(ctx, id); bucket != nil && bucket.Exist(ctx, id.Bytes()) { + return bucket + } + } + + // fallback to warm selector + return n.warmSelector.Select(ctx, id) } // Rebuild implements storage.Selector. @@ -121,7 +229,11 @@ func (n *nativeStorage) Rebuild(ctx context.Context, buckets []storage.Bucket) e // Buckets implements storage.Storage. func (n *nativeStorage) Buckets() []storage.Bucket { - return append(n.normalBucket, n.hotBucket...) + buckets := make([]storage.Bucket, 0, len(n.normalBucket)+len(n.hotBucket)+len(n.coldBucket)) + buckets = append(buckets, n.normalBucket...) + buckets = append(buckets, n.hotBucket...) + buckets = append(buckets, n.coldBucket...) + return buckets } // PURGE implements storage.Storage. @@ -232,6 +344,10 @@ func (n *nativeStorage) Close() error { errs = append(errs, bucket.Close()) } + for _, bucket := range n.coldBucket { + errs = append(errs, bucket.Close()) + } + if n.memoryBucket != nil { if err := n.memoryBucket.Close(); err != nil { errs = append(errs, err)