feat: cold / warm / hot layer levels cache storage#40
Conversation
…mote and demote policies.
…s metadata and integrate it into the caching middleware.
There was a problem hiding this comment.
Pull request overview
This PR adds tiered (cold/warm/hot) storage migration support, including new migration configuration, new storage/bucket interfaces for access tracking and migration, and a HeavyKeeper-based frequency tracker to drive promotions. It also wires Touch() into the HTTP caching pipeline to record accesses.
Changes:
- Added migration config/types and updated storage selection to support a migrator mode.
- Implemented HeavyKeeper and integrated it into the disk bucket for promotion tracking (and eviction-triggered demotion hooks).
- Updated bucket implementations, middleware, and tests to use the new
BucketConfigand interfaces (Touch,Migrate,SetMigration).
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/config.test.yaml | Adds migration configuration block to test config YAML. |
| storage/storage.go | Enables migrator mode when migration is configured; updates empty bucket config type. |
| storage/migrator.go | Introduces migrator storage that selects across hot/warm/cold layers and triggers promote/demote. |
| storage/diraware/bucket.go | Extends wrapper bucket to forward new Touch/Migrate/SetMigration methods. |
| storage/builder.go | Switches bucket construction to storage.BucketConfig and propagates migration config. |
| storage/bucket/memory/memory_test.go | Updates tests to construct memory bucket via BucketConfig. |
| storage/bucket/memory/memory.go | Adds Touch/migration plumbing to memory bucket and updates mark creation API usage. |
| storage/bucket/empty/empty.go | Adds no-op Touch/Migrate/SetMigration to satisfy new bucket interface. |
| storage/bucket/disk/disk_test.go | Updates disk bucket tests (including a test name fix) and import paths. |
| storage/bucket/disk/disk_migration_test.go | Adds tests for promote/demote integration behavior using a mock migration. |
| storage/bucket/disk/disk.go | Integrates HeavyKeeper + Touch into disk bucket and adds migration hooks. |
| server/middleware/caching/processor.go | Calls Touch() after responses to increment access stats. |
| server/middleware/caching/internal.go | Stores request context on Caching for later Touch() call. |
| server/middleware/caching/caching_chunkpart_test.go | Updates test bucket construction to BucketConfig. |
| pkg/algorithm/heavykeeper/heavykeeper.go | Adds HeavyKeeper probabilistic counter implementation. |
| pkg/algorithm/heavykeeper/heavykeeper_test.go | Adds unit tests and benchmarks for HeavyKeeper. |
| api/defined/v1/storage/storage.go | Extends interfaces (Touch, Migrate, SetMigration, migrator types) and changes NewMark signature. |
| api/defined/v1/storage/indexdb.go | Adds storage layer constants (warm, cold) alongside existing ones. |
| api/defined/v1/storage/config.go | Adds migration/bucket config structs under the storage API package. |
| conf/conf.go | Adds migration config to main application configuration schema. |
| config.example.yaml | Documents migration config options in example configuration. |
| go.mod | Adds/adjusts dependencies for new functionality. |
| go.sum | Updates sums for dependency changes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/bucket/memory/memory.go
Outdated
|
|
||
| // Migrate implements [storage.Bucket]. | ||
| func (m *memoryBucket) Migrate(ctx context.Context, id *object.ID, dest storage.Bucket) error { | ||
| panic("unimplemented") |
There was a problem hiding this comment.
Migrate currently panics. In migrator mode, migratorStorage.Promote/Demote calls src.Migrate(...), so an in-memory bucket participating in migration will crash the process. Implement migration (or return an error) instead of panicking.
| panic("unimplemented") | |
| // Currently, in-memory buckets do not support migration; return an error | |
| // instead of panicking so callers can handle this case gracefully. | |
| _ = ctx | |
| _ = id | |
| _ = dest | |
| return errors.New("memory bucket: migration is not supported") |
|
|
||
| cfg := &storage.BucketConfig{ | ||
| Path: basepath, | ||
| DBPath: filepath.Join(basepath, "/.indexdb"), |
There was a problem hiding this comment.
filepath.Join(basepath, "/.indexdb") discards basepath because the second argument is absolute, so this test will try to use /.indexdb on the root filesystem. Use a relative segment like ".indexdb" instead.
| DBPath: filepath.Join(basepath, "/.indexdb"), | |
| DBPath: filepath.Join(basepath, ".indexdb"), |
| demote: | ||
| min_hits: 2 # window hits to demote | ||
| window: 5m # 5 minutes window | ||
| occupancy: 75 # percent useage |
There was a problem hiding this comment.
Typo in comment: "useage" -> "usage".
| occupancy: 75 # percent useage | |
| occupancy: 75 # percent usage |
| demote: | ||
| min_hits: 2 # window hits to demote | ||
| window: 5m # 5 minutes window | ||
| occupancy: 75 # percent useage |
There was a problem hiding this comment.
Typo in comment: "useage" -> "usage".
| occupancy: 75 # percent useage | |
| occupancy: 75 # percent usage |
|
|
||
| cfg := &storage.BucketConfig{ | ||
| Path: basepath, | ||
| DBPath: filepath.Join(basepath, "/.indexdb"), |
There was a problem hiding this comment.
Same issue here: filepath.Join(basepath, "/.indexdb") results in /.indexdb (absolute path), ignoring basepath. Use a relative path segment (e.g. ".indexdb") so tests stay within the temp dir.
| DBPath: filepath.Join(basepath, "/.indexdb"), | |
| DBPath: filepath.Join(basepath, ".indexdb"), |
| switch src.StoreType() { | ||
| case storage.TypeHot: | ||
| layer = storage.TypeWarm | ||
| case storage.TypeWarm: | ||
| layer = storage.TypeCold | ||
| default: | ||
| return nil // no demotion for other types | ||
| } | ||
|
|
||
| target := m.SelectLayer(ctx, id, layer) | ||
| if target == nil { | ||
| return fmt.Errorf("no target bucket found for demotion from %s to %s", src.StoreType(), layer) | ||
| } | ||
|
|
||
| return src.Migrate(ctx, id, target) | ||
| } | ||
|
|
||
| // Promote implements [storage.Migrator]. | ||
| func (m *migratorStorage) Promote(ctx context.Context, id *object.ID, src storage.Bucket) error { | ||
| // Cold -> Warm -> Hot | ||
| var layer string | ||
| switch src.StoreType() { | ||
| case storage.TypeCold: | ||
| layer = storage.TypeWarm | ||
| case storage.TypeWarm: | ||
| layer = storage.TypeHot | ||
| default: | ||
| return nil // no promotion for other types | ||
| } |
There was a problem hiding this comment.
Promotion/demotion ignores buckets with StoreType()==storage.TypeNormal. The example config still uses type: normal, and reinit() also classifies TypeNormal into the warm layer, so Promote/Demote should treat TypeNormal the same as TypeWarm (e.g., allow normal->hot promotion and hot->normal demotion).
| if opt.Migration != nil && opt.Migration.Enabled { | ||
| // Default width 4096 if not set or small | ||
| width := opt.MaxObjectLimit | ||
| if width < 4096 { | ||
| width = 4096 | ||
| } | ||
| bucket.hkPromote = heavykeeper.New(3, width, 0.9) | ||
| bucket.lastPromoteReset = time.Now() | ||
| } |
There was a problem hiding this comment.
HeavyKeeper width is derived directly from MaxObjectLimit. With configs like the example (max_object_limit: 10000000), this will allocate a very large in-memory structure (potentially hundreds of MB) per bucket at startup. Consider adding an explicit HeavyKeeper size setting and/or clamping width to a sane upper bound to avoid OOMs.
| d.hkPromote.Add(id.Bytes()) | ||
| if d.hkPromote.Query(id.Bytes()) >= uint32(d.opt.Migration.Promote.MinHits) { | ||
| go func() { | ||
| // check migration interface | ||
| if d.migration != nil { | ||
| if err := d.migration.Promote(context.Background(), id, d); err != nil { | ||
| log.Warnf("promote %s failed: %v", id.Key(), err) | ||
| } | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
Once an object crosses the promote threshold, every subsequent Touch/Lookup will spawn a new goroutine calling migration.Promote, which can create unbounded goroutine growth under hot-key traffic. Consider triggering promotion only on the transition to the threshold (or using a single worker/queue / per-key de-dupe) instead of launching a goroutine per access.
|
|
||
| func NewMark(clock int64, refs uint64) Mark { | ||
| func NewMark(clock, refs int64) Mark { | ||
| return Mark(clock)<<CounterBits | Mark(refs) |
There was a problem hiding this comment.
NewMark packs clock and refs into a bitfield, but it currently ORs the raw refs value without masking/clamping to RefsMask. If refs exceeds 16 bits it will corrupt the clock bits. Since refs is now int64, negative values would also corrupt the mark. Clamp/mask refs (and enforce non-negative) inside NewMark to keep the bit layout valid.
| return Mark(clock)<<CounterBits | Mark(refs) | |
| if refs < 0 { | |
| refs = 0 | |
| } else if refs > int64(RefsMask) { | |
| refs = int64(RefsMask) | |
| } | |
| return Mark(clock)<<CounterBits | Mark(uint64(refs)&uint64(RefsMask)) |
storage/bucket/disk/disk.go
Outdated
|
|
||
| // Migrate implements [storage.Bucket]. | ||
| func (d *diskBucket) Migrate(ctx context.Context, id *object.ID, dest storage.Bucket) error { | ||
| panic("unimplemented") |
There was a problem hiding this comment.
Migrate is part of the storage.Bucket interface and is invoked by migratorStorage.Promote/Demote via src.Migrate(...), but this implementation currently panics. This will crash the process as soon as migration is exercised. Implement object+metadata copy to dest and delete from source (or return a proper error until supported).
| panic("unimplemented") | |
| return errors.New("diskBucket.Migrate not supported") |
…ect DBPath in disk migration tests.
This pull request introduces a comprehensive tiered storage migration feature, enabling automatic promotion and demotion of objects between storage layers based on configurable policies. It also adds a probabilistic data structure (HeavyKeeper) for tracking access frequency, updates configuration and interface definitions to support migration, and integrates these changes into the disk bucket implementation. Additional minor improvements and dependency updates are included.
Tiered Storage and Migration Support
PromoteConfig,DemoteConfig,MigrationConfig,BucketConfig) instorage/config.go, and updated the main storage configuration and example YAML to support migration policies. [1] [2] [3] [4]Touch,Migrate, andSetMigrationmethods toOperation; introducedMigrationandMigratorinterfaces for object promotion/demotion and layer selection. [1] [2] [3]HeavyKeeper Data Structure
HeavyKeeper) for tracking frequently accessed objects, along with its unit and benchmark tests. [1] [2]Disk Bucket Integration
diskBucketto use the newBucketConfigand integrate migration logic:API and Type Consistency
NewMarkfunction signature for clarity and correctness. [1] [2]Dependencies