Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions api/defined/v1/storage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package storage

import "time"

type (
PromoteConfig struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 1m
}
DemoteConfig struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 1m
Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N%
}
MigrationConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Promote PromoteConfig `json:"promote" yaml:"promote"` // 升温
Demote DemoteConfig `json:"demote" yaml:"demote"` // 降温
}

BucketConfig struct {
Path string `json:"path" yaml:"path"` // local path or ?
Driver string `json:"driver" yaml:"driver"` // native, custom-driver
Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory
DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble
DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: <bucket_path>/.indexdb
AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async
SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part
MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard
Migration *MigrationConfig `json:"migration" yaml:"migration"` // migration config
DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config
}
)
4 changes: 3 additions & 1 deletion api/defined/v1/storage/indexdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ var ErrKeyNotFound = errors.New("key not found")

const (
TypeInMemory = "inmemory"
TypeNormal = "normal"
TypeNormal = "normal" // normal, warm 同一个
TypeWarm = "warm" // normal, warm 同一个
TypeCold = "cold"
TypeHot = "hot"
)

Expand Down
24 changes: 23 additions & 1 deletion api/defined/v1/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Selector interface {
type Operation interface {
// Lookup retrieves the metadata for the specified object ID.
Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error)
// Touch updates the last access time and refs count of the object.
Touch(ctx context.Context, id *object.ID)
// Store store the metadata for the specified object ID.
Store(ctx context.Context, meta *object.Metadata) error
// Exist checks if the object exists.
Expand All @@ -42,6 +44,12 @@ type Operation interface {
WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error)
// ReadChunkFile open chunk file and returns io.ReadCloser
ReadChunkFile(ctx context.Context, id *object.ID, index uint32) (File, string, error)
// Migrate migrates the object from this bucket to the destination bucket.
// It should copy the object data and metadata to the destination, and then
// remove it from the source bucket upon success.
Migrate(ctx context.Context, id *object.ID, dest Bucket) error
// SetMigration sets the migration for the bucket.
SetMigration(m Migration) error
}

type Storage interface {
Expand All @@ -55,6 +63,20 @@ type Storage interface {
PURGE(storeUrl string, typ PurgeControl) error
}

type Migration interface {
// Demote demotes the object from the source bucket to the destination bucket.
Demote(ctx context.Context, id *object.ID, src Bucket) error
// Promote promotes the object from the source bucket to the destination bucket.
Promote(ctx context.Context, id *object.ID, src Bucket) error
}

type Migrator interface {
Storage

// SelectLayer selects the Bucket by the object ID and layer.
SelectLayer(ctx context.Context, id *object.ID, layer string) Bucket
}

type Bucket interface {
io.Closer
Operation
Expand Down Expand Up @@ -131,7 +153,7 @@ const (
RefsMask = (1 << CounterBits) - 1
)

func NewMark(clock int64, refs uint64) Mark {
func NewMark(clock, refs int64) Mark {
return Mark(clock)<<CounterBits | Mark(refs)
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewMark packs clock and refs into a bitfield, but it currently ORs the raw refs value without masking/clamping to RefsMask. If refs exceeds 16 bits it will corrupt the clock bits. Since refs is now int64, negative values would also corrupt the mark. Clamp/mask refs (and enforce non-negative) inside NewMark to keep the bit layout valid.

Suggested change
return Mark(clock)<<CounterBits | Mark(refs)
if refs < 0 {
refs = 0
} else if refs > int64(RefsMask) {
refs = int64(RefsMask)
}
return Mark(clock)<<CounterBits | Mark(uint64(refs)&uint64(RefsMask))

Copilot uses AI. Check for mistakes.
}

Expand Down
35 changes: 26 additions & 9 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ type Upstream struct {
}

type Storage struct {
Driver string `json:"driver" yaml:"driver"`
DBType string `json:"db_type" yaml:"db_type"`
DBPath string `json:"db_path" yaml:"db_path"` // default db path
AsyncLoad bool `json:"async_load" yaml:"async_load"`
EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"`
SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"`
SliceSize uint64 `json:"slice_size" yaml:"slice_size"`
DirAware *DirAware `json:"diraware" yaml:"diraware"`
Buckets []*Bucket `json:"buckets" yaml:"buckets"`
Driver string `json:"driver" yaml:"driver"`
DBType string `json:"db_type" yaml:"db_type"`
DBPath string `json:"db_path" yaml:"db_path"` // default db path
AsyncLoad bool `json:"async_load" yaml:"async_load"`
EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"`
SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"`
SliceSize uint64 `json:"slice_size" yaml:"slice_size"`
DirAware *DirAware `json:"diraware" yaml:"diraware"`
Migration *Migration `json:"migration" yaml:"migration"`
Buckets []*Bucket `json:"buckets" yaml:"buckets"`
}

func (r *Storage) FillDefault() {
Expand Down Expand Up @@ -109,6 +110,22 @@ type DirAware struct {
AutoClear bool `json:"auto_clear" yaml:"auto_clear"` // 自动清理过期任务(凌晨2点左右执行)
}

type (
Promote struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m
}
Demote struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m
Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N%
}
Migration struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Promote Promote `json:"promote" yaml:"promote"` // 升温
Demote Demote `json:"demote" yaml:"demote"` // 降温
}
)
type Plugin struct {
Name string `json:"name" yaml:"name"`
Options map[string]any `json:"options" yaml:"options"`
Expand Down
9 changes: 9 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ storage:
eviction_policy: fifo # fifo, lru, lfu
selection_policy: hashring # hashring, roundrobin
slice_size: 1048576 # 1MB
migration:
enabled: false # enable tiering bucket
promote:
min_hits: 10 # window hits to promote
window: 1m # 1 minute window
demote:
min_hits: 2 # window hits to demote
window: 5m # 5 minutes window
occupancy: 75 # percent useage
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: "useage" -> "usage".

Suggested change
occupancy: 75 # percent useage
occupancy: 75 # percent usage

Copilot uses AI. Check for mistakes.
diraware:
enabled: true # 默认 true
store_path: /cache1/.diraware
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.25.2
require (
dario.cat/mergo v1.0.2
github.com/andybalholm/brotli v1.2.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/cloudflare/tableflip v1.2.3
github.com/cockroachdb/pebble/v2 v2.1.2
github.com/fsnotify/fsnotify v1.9.0
Expand Down Expand Up @@ -34,7 +35,6 @@ require (
github.com/antlabs/timer v0.1.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/crlib v0.0.0-20251122031428-fe658a2dbda1 // indirect
github.com/cockroachdb/errors v1.12.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 // indirect
Expand All @@ -60,6 +60,7 @@ require (
github.com/prometheus/common v0.67.4 // indirect
github.com/prometheus/procfs v0.19.2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/btree v1.8.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05Zp
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA=
Expand Down
139 changes: 139 additions & 0 deletions pkg/algorithm/heavykeeper/heavykeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package heavykeeper

import (
"hash/fnv"
"math"
"math/rand"
"sync"
"time"
)

// HeavyKeeper is a probabilistic data structure for top-k items.
type HeavyKeeper struct {
buckets [][]bucket
depth int
width int
decay float64
r *rand.Rand
mu sync.RWMutex
}

type bucket struct {
fingerprint uint64
count uint32
}

// New creates a new HeavyKeeper.
// depth: number of arrays (hash functions)
// width: number of buckets per array
// decay: probability of decay (0.9 means 90% chance to decay)
func New(depth, width int, decay float64) *HeavyKeeper {
hk := &HeavyKeeper{
buckets: make([][]bucket, depth),
depth: depth,
width: width,
decay: decay,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}

for i := range hk.buckets {
hk.buckets[i] = make([]bucket, width)
}

return hk
}

// Add adds a key to the HeavyKeeper.
func (hk *HeavyKeeper) Add(key []byte) {
hk.mu.Lock()
defer hk.mu.Unlock()

fingerprint := hk.hash(key)

// Use double hashing for multiple hash functions
// h1 = fingerprint
// h2 = fnv(key)
// idx = (h1 + i*h2) % width
h2 := hk.hash2(key)

for i := 0; i < hk.depth; i++ {
idx := (fingerprint + uint64(i)*h2) % uint64(hk.width)
b := &hk.buckets[i][idx]

if b.count == 0 {
b.fingerprint = fingerprint
b.count = 1
continue
}

if b.fingerprint == fingerprint {
b.count++
continue
}

// Decay
if hk.r.Float64() < math.Pow(hk.decay, float64(b.count)) {
b.count--
if b.count == 0 {
b.fingerprint = fingerprint
b.count = 1
}
}
}
}

// Query returns the estimated count for the key.
func (hk *HeavyKeeper) Query(key []byte) uint32 {
hk.mu.RLock()
defer hk.mu.RUnlock()

fingerprint := hk.hash(key)
h2 := hk.hash2(key)
var maxCount uint32

for i := 0; i < hk.depth; i++ {
idx := (fingerprint + uint64(i)*h2) % uint64(hk.width)
b := &hk.buckets[i][idx]

if b.fingerprint == fingerprint {
if b.count > maxCount {
maxCount = b.count
}
}
}

return maxCount
}

// Clear resets the HeavyKeeper.
func (hk *HeavyKeeper) Clear() {
hk.mu.Lock()
defer hk.mu.Unlock()

for i := range hk.buckets {
for j := range hk.buckets[i] {
hk.buckets[i][j].count = 0
hk.buckets[i][j].fingerprint = 0
}
}
}

func (hk *HeavyKeeper) hash(key []byte) uint64 {
h := fnv.New64a()
h.Write(key)
return h.Sum64()
}

func (hk *HeavyKeeper) hash2(key []byte) uint64 {
// Simple secondary hash: fnv with salt or just different algo
// Using FNV-1 (not 1a) or just rotate?
// Let's use a simple mix.
h := uint64(2166136261)
for _, c := range key {
h *= 16777619
h ^= uint64(c)
}
// salt
h ^= 0x9e3779b97f4a7c15
return h
}
Loading