Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/defined/v1/storage/indexdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ var ErrKeyNotFound = errors.New("key not found")

const (
TypeInMemory = "inmemory"
TypeNormal = "normal"
TypeNormal = "normal" // normal, warm 同一个
TypeWarm = "warm" // normal, warm 同一个
TypeCold = "cold"
TypeHot = "hot"
)

Expand Down
17 changes: 17 additions & 0 deletions api/defined/v1/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Selector interface {
type Operation interface {
// Lookup retrieves the metadata for the specified object ID.
Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error)
// Touch updates the last access time for the specified object ID.
Touch(ctx context.Context, id *object.ID) error
// Store store the metadata for the specified object ID.
Store(ctx context.Context, meta *object.Metadata) error
// Exist checks if the object exists.
Expand All @@ -42,6 +44,8 @@ type Operation interface {
WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error)
// ReadChunkFile open chunk file and returns io.ReadCloser
ReadChunkFile(ctx context.Context, id *object.ID, index uint32) (File, string, error)
// MoveTo moves the object to the target bucket.
MoveTo(ctx context.Context, id *object.ID, target Bucket) error
}

type Storage interface {
Expand All @@ -53,6 +57,12 @@ type Storage interface {
SharedKV() SharedKV

PURGE(storeUrl string, typ PurgeControl) error

// Promote moves object up the tiers (e.g., warm -> hot)
Promote(ctx context.Context, id *object.ID, src Bucket) error

// SelectWithType selects a Bucket by the tier type.
SelectWithType(ctx context.Context, id *object.ID, tierType string) Bucket
}

type Bucket interface {
Expand All @@ -77,6 +87,13 @@ type Bucket interface {
StoreType() string
// Path returns the Bucket path.
Path() string
// SetMigration sets the migration for the Bucket.
SetMigration(migration Migration)
}

type Migration interface {
Demote(ctx context.Context, id *object.ID, src Bucket) error
Promote(ctx context.Context, id *object.ID, src Bucket) error
}

type PurgeControl struct {
Expand Down
64 changes: 43 additions & 21 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,50 @@ type Upstream struct {
Features map[string]any `json:"features" yaml:"features"`
}

type Storage struct {
Driver string `json:"driver" yaml:"driver"`
DBType string `json:"db_type" yaml:"db_type"`
DBPath string `json:"db_path" yaml:"db_path"` // default db path
AsyncLoad bool `json:"async_load" yaml:"async_load"`
EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"`
SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"`
SliceSize uint64 `json:"slice_size" yaml:"slice_size"`
Buckets []*Bucket `json:"buckets" yaml:"buckets"`
}
type (
Storage struct {
Driver string `json:"driver" yaml:"driver"`
DBType string `json:"db_type" yaml:"db_type"`
DBPath string `json:"db_path" yaml:"db_path"` // default db path
AsyncLoad bool `json:"async_load" yaml:"async_load"`
EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"`
SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"`
SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part
Tiering *Tiering `json:"tiering" yaml:"tiering"` // tiering config
Buckets []*Bucket `json:"buckets" yaml:"buckets"` // bucket list
}

type Bucket struct {
Path string `json:"path" yaml:"path"` // local path or ?
Driver string `json:"driver" yaml:"driver"` // native, custom-driver
Type string `json:"type" yaml:"type"` // normal, cold, hot, fastmemory
DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble
DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: <bucket_path>/.indexdb
AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async
SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part
MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard
DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config
}
Bucket struct {
Path string `json:"path" yaml:"path"` // local path or ?
Driver string `json:"driver" yaml:"driver"` // native, custom-driver
Type string `json:"type" yaml:"type"` // warm(normal), cold, hot, inmemory
DBType string `json:"db_type" yaml:"db_type"` // boltdb, badgerdb, pebble
DBPath string `json:"db_path" yaml:"db_path"` // db path, defult: <bucket_path>/.indexdb
AsyncLoad bool `json:"async_load" yaml:"async_load"` // load metadata async
SliceSize uint64 `json:"slice_size" yaml:"slice_size"` // slice size for each part
MaxObjectLimit int `json:"max_object_limit" yaml:"max_object_limit"` // max object limit, upper Bound discard
DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config
Tiering *Tiering `json:"-" yaml:"-"` // extend global tiering config
}
)

type (
Promote struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m
MaxObjectSize uint64 `json:"max_object_size" yaml:"max_object_size"` // 最大对象大小
}
Demote struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m
Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N%
}
Tiering struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Promote Promote `json:"promote" yaml:"promote"` // 升温
Demote Demote `json:"demote" yaml:"demote"` // 降温
Comment on lines +100 to +112
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 100 contains Chinese text "时间窗口内命中 >= N" which translates to "hits within time window >= N". For consistency and to make the codebase more accessible to all contributors, consider using English for comments. The same issue appears on lines 101, 102, 105, 106, and 107.

Suggested change
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 >= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m
MaxObjectSize uint64 `json:"max_object_size" yaml:"max_object_size"` // 最大对象大小
}
Demote struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // 时间窗口内命中 <= N
Window time.Duration `json:"window" yaml:"window"` // 时间窗口 10m
Occupancy float64 `json:"occupancy" yaml:"occupancy"` // 热盘存储占用率 >= N%
}
Tiering struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Promote Promote `json:"promote" yaml:"promote"` // 升温
Demote Demote `json:"demote" yaml:"demote"` // 降温
MinHits int `json:"min_hits" yaml:"min_hits"` // hits within the time window >= N
Window time.Duration `json:"window" yaml:"window"` // time window (for example, 10m)
MaxObjectSize uint64 `json:"max_object_size" yaml:"max_object_size"` // maximum object size
}
Demote struct {
MinHits int `json:"min_hits" yaml:"min_hits"` // hits within the time window <= N
Window time.Duration `json:"window" yaml:"window"` // time window (for example, 10m)
Occupancy float64 `json:"occupancy" yaml:"occupancy"` // hot storage disk occupancy >= N%
}
Tiering struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Promote Promote `json:"promote" yaml:"promote"` // promote (move to a hotter tier)
Demote Demote `json:"demote" yaml:"demote"` // demote (move to a colder tier)

Copilot uses AI. Check for mistakes.
}
)

type Plugin struct {
Name string `json:"name" yaml:"name"`
Expand Down
10 changes: 10 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ storage:
eviction_policy: fifo # fifo, lru, lfu
selection_policy: hashring # hashring, roundrobin
slice_size: 1048576 # 1MB
tiering:
enabled: false # enable tiering bucket
promote:
min_hits: 10 # window hits to promote
window: 1m # 1 minute window
max_object_size: 20971520 # 20MB
demote:
min_hits: 2 # window hits to demote
window: 5m # 5 minutes window
occupancy: 75 # percent useage
buckets:
- path: /cache1
type: normal
Expand Down
2 changes: 1 addition & 1 deletion server/middleware/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (c *Caching) getUpstreamReader(fromByte, toByte uint64, async bool) (io.Rea
now := time.Now()
c.log.Debugf("getUpstreamReader doProxy[chunk]: begin: %s, rawRange: %s, newRange: %s", now, rawRange, newRange)
resp, err := c.doProxy(req, true)
c.log.Infof("getUpstreamReader doProxy[chunk]: timeCost: %s, rawRange: %s, newRange: %s", time.Since(now), rawRange, newRange)
c.log.Debugf("getUpstreamReader doProxy[chunk]: timeCost: %s, rawRange: %s, newRange: %s", time.Since(now), rawRange, newRange)
if err != nil {
closeBody(resp)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion server/middleware/caching/caching_chunkpart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_getContents(t *testing.T) {
memoryBucket, _ := memory.New(&conf.Bucket{}, sharedkv.NewEmpty())

req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, "http://www.example.com/path/to/1.apk", nil)
objectID, _ := newObjectIDFromRequest(req, "", true)
objectID := newObjectIDFromRequest(req, "", true)
c := &Caching{
log: log.NewHelper(log.GetLogger()),
processor: mockProcessorChain(),
Expand Down
17 changes: 5 additions & 12 deletions server/middleware/caching/caching_vary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package caching
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"slices"
Expand Down Expand Up @@ -126,10 +125,7 @@ func (v *VaryProcessor) lookup(caching *Caching, req *http.Request) *object.Meta
varyKey := varycontrol.Clean(caching.md.Headers.Values("Vary")...)

// Generate object ID based on Vary data from request headers.
vid, err := newObjectIDFromRequest(req, varyKey.VaryData(req.Header), caching.opt.IncludeQueryInCacheKey)
if err != nil {
return nil
}
vid := newObjectIDFromRequest(req, varyKey.VaryData(req.Header), caching.opt.IncludeQueryInCacheKey)

vmd, err := caching.bucket.Lookup(req.Context(), vid)
if err != nil {
Expand Down Expand Up @@ -193,10 +189,7 @@ func (v *VaryProcessor) handleNoResponseVary(caching *Caching, resp *http.Respon
caching.log.Debugf("vary data already exist: %s", varyData)
}

l2MetaID, err := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to create hash-key: %w", err)
}
l2MetaID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)

cl, _ := strconv.Atoi(resp.Header.Get("Content-Length"))
return &object.Metadata{
Expand All @@ -222,7 +215,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response
if metaVary.Compare(respVary) {
// Vary keys match, try to find existing Vary cache.
varyData = metaVary.VaryData(caching.req.Header)
varyKey, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
varyKey := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
varyMeta, err := caching.bucket.Lookup(caching.req.Context(), varyKey)
if err != nil {
caching.log.Warnf("Vary key lookup failed: %v", err)
Expand Down Expand Up @@ -251,7 +244,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response
}

// Build new Vary cache object.
varyObjectID, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
varyObjectID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
return v.upgrade(caching, resp, varyObjectID, varyData)
}

Expand All @@ -264,7 +257,7 @@ func (v *VaryProcessor) handleResponseVary(caching *Caching, resp *http.Response

caching.md.VirtualKey = nil
varyData = respVary.VaryData(caching.req.Header)
varyObjectID, _ := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
varyObjectID := newObjectIDFromRequest(caching.req, varyData, caching.opt.IncludeQueryInCacheKey)
return v.upgrade(caching, resp, varyObjectID, varyData)
}

Expand Down
17 changes: 10 additions & 7 deletions server/middleware/caching/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Caching) setXCache(resp *http.Response) {
return
}

resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, "(tavern/4.0)"}, " "))
resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, c.bucket.StoreType(), "(tavern/4.0)"}, " "))
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setXCache method will panic if c.bucket is nil. The method attempts to call c.bucket.StoreType() on line 94 without checking if bucket is nil. While this might be protected by callers, the method should be defensive and handle the nil case, especially since this is called in the response path where stability is critical.

Copilot uses AI. Check for mistakes.

metric := metrics.FromContext(c.req.Context())
metric.CacheStatus = c.cacheStatus.String()
Expand Down Expand Up @@ -188,6 +188,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in
c.log.Debugf("find available chunk index %d, availableChunks: %v", index, availableChunks)
fromByte := uint64(reqChunks[from] * uint32(c.md.BlockSize))
if index < len(availableChunks) {

chunkFile, _ := getSliceChunkFile(c, availableChunks[index])
if chunkFile != nil {
if err := checkChunkSize(c, chunkFile, idx); err != nil {
Expand Down Expand Up @@ -310,17 +311,19 @@ func cloneRequest(req *http.Request) *http.Request {
return proxyReq
}

func newObjectIDFromRequest(req *http.Request, vd string, includeQuery bool) (*object.ID, error) {
// option: cache-key include querystring
//
// TODO: get cache-key from frontend protocol rule.
func newObjectIDFromRequest(req *http.Request, vd string, includeQuery bool) *object.ID {
// get cache-key from frontend protocol rule.
if cacheKey := req.Header.Get(constants.InternalStoreUrl); cacheKey != "" {
return object.NewVirtualID(cacheKey, vd)
}

// option: cache-key include querystring
// or later default rule.
if includeQuery {
return object.NewVirtualID(req.URL.String(), vd), nil
return object.NewVirtualID(req.URL.String(), vd)
}

return object.NewVirtualID(fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path), vd), nil
return object.NewVirtualID(fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path), vd)
}

func closeBody(resp *http.Response) {
Expand Down
7 changes: 7 additions & 0 deletions server/middleware/caching/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ func (c *Caching) RUnlock() {
}
globalLocker.getLock(c.Key()).RUnlock()
}

func (c *Caching) TryLock() bool {
if c.id == nil {
return false
}
return globalLocker.getLock(c.Key()).TryLock()
}
Loading
Loading