From 31ffd896c927f17b4712cd5f4edf4b7de6c23c35 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Thu, 12 Feb 2026 21:04:59 +0800 Subject: [PATCH 1/2] refactor: rename internal constants package to protocol and document internal headers. --- api/defined/v1/storage/indexdb.go | 2 +- internal/protocol/README.md | 45 +++++++++++++++++++ .../global.go => protocol/protocol.go} | 12 ++--- metrics/request_info.go | 4 +- pkg/e2e/e2e.go | 4 +- plugin/purge/purge.go | 4 +- plugin/registry.go | 4 +- server/middleware/caching/caching.go | 12 ++--- .../middleware/caching/caching_fillrange.go | 4 +- server/middleware/caching/caching_prefetch.go | 6 +-- server/middleware/caching/internal.go | 12 ++--- storage/bucket/disk/disk.go | 5 ++- storage/bucket/memory/memory.go | 4 +- storage/builder.go | 6 ++- tests/all-features/errcode/errcode_test.go | 6 +-- 15 files changed, 91 insertions(+), 39 deletions(-) create mode 100644 internal/protocol/README.md rename internal/{constants/global.go => protocol/protocol.go} (68%) diff --git a/api/defined/v1/storage/indexdb.go b/api/defined/v1/storage/indexdb.go index 8416521..eb65d7d 100644 --- a/api/defined/v1/storage/indexdb.go +++ b/api/defined/v1/storage/indexdb.go @@ -12,7 +12,7 @@ import ( var ErrKeyNotFound = errors.New("key not found") const ( - TypeInMemory = "inmemory" + TypeInMemory = "memory" TypeNormal = "normal" // normal, warm 同一个 TypeWarm = "warm" // normal, warm 同一个 TypeCold = "cold" diff --git a/internal/protocol/README.md b/internal/protocol/README.md new file mode 100644 index 0000000..c1e5f11 --- /dev/null +++ b/internal/protocol/README.md @@ -0,0 +1,45 @@ +## custom internal protocol defined + +### ProtocolRequestIDKey + +Set request id + +### X-FS-Mem + +force store in memory + +### ProtocolCacheStatusKey + +Response cache status with name, like X-Cache: HIT from memory + +### PrefetchCacheKey + +Prefetch cache, value 1 mean prefetch, 0 mean not prefetch + +### CacheTime + +Set force `Cache-Control` Cache time, value is seconds, like `CacheTime: 60` mean `Cache-Control: max-age=60` + +### InternalTraceKey + +Internal trace key, value is 1 or 0, 1 mean enable trace, 0 mean disable trace + +### InternalStoreUrl + +Set force store-url, value is string, like `http://example.com/somepath/app.apk` + +### InternalSwapfile + +Show response swapfile info, debug now! + +### InternalFillRangePercent + +Set fill range percent, value is int, like `InternalFillRangePercent: 50` mean fill 50% of response + +### InternalCacheErrCode + +Set force cache error code, value is int, like `InternalCacheErrCode: 1` mean force cache errcode like > 400 + +### InternalUpstreamAddr + +Dynamic set upstream addr, value is string, like `InternalUpstreamAddr: [IP_ADDRESS]` diff --git a/internal/constants/global.go b/internal/protocol/protocol.go similarity index 68% rename from internal/constants/global.go rename to internal/protocol/protocol.go index 4babfc8..917e553 100644 --- a/internal/constants/global.go +++ b/internal/protocol/protocol.go @@ -1,13 +1,15 @@ -package constants +package protocol const AppName = "tavern" // define gw->backend Protocol constants const ( - ProtocolRequestIDKey = "X-Request-ID" - ProtocolCacheStatusKey = "X-Cache" - PrefetchCacheKey = "X-Prefetch" - CacheTime = "X-CacheTime" + ProtocolRequestIDKey = "X-Request-ID" + ProtocolCacheStatusKey = "X-Cache" + ProtocolForceStoreMemory = "X-FS-Mem" + + PrefetchCacheKey = "X-Prefetch" + CacheTime = "X-CacheTime" InternalTraceKey = "i-xtrace" InternalStoreUrl = "i-x-store-url" diff --git a/metrics/request_info.go b/metrics/request_info.go index fddeb91..675c5a7 100644 --- a/metrics/request_info.go +++ b/metrics/request_info.go @@ -8,7 +8,7 @@ import ( "time" "github.com/omalloc/tavern/contrib/log" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" ) type requestMetricKey struct{} @@ -53,7 +53,7 @@ func newContext(ctx context.Context, metric *RequestMetric) context.Context { } func MustParseRequestID(h http.Header) string { - id := h.Get(constants.ProtocolRequestIDKey) + id := h.Get(protocol.ProtocolRequestIDKey) // protocol request id header not found, generate a new one if id == "" { return generateRequestID() diff --git a/pkg/e2e/e2e.go b/pkg/e2e/e2e.go index a499e15..9268bd5 100644 --- a/pkg/e2e/e2e.go +++ b/pkg/e2e/e2e.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" ) var ( @@ -92,7 +92,7 @@ func (e *E2E) Do(rewrite func(r *http.Request)) (*http.Response, error) { method := e.req.Method - e.req.Header.Set(constants.InternalUpstreamAddr, e.ts.Listener.Addr().String()) + e.req.Header.Set(protocol.InternalUpstreamAddr, e.ts.Listener.Addr().String()) if dumpReq.Load() && method != "PURGE" { DumpReq(e.req) diff --git a/plugin/purge/purge.go b/plugin/purge/purge.go index f0b1fff..961e9ad 100644 --- a/plugin/purge/purge.go +++ b/plugin/purge/purge.go @@ -13,7 +13,7 @@ import ( configv1 "github.com/omalloc/tavern/api/defined/v1/plugin" storagev1 "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/contrib/log" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" "github.com/omalloc/tavern/pkg/encoding" "github.com/omalloc/tavern/plugin" "github.com/omalloc/tavern/storage" @@ -94,7 +94,7 @@ func (r *PurgePlugin) HandleFunc(next http.HandlerFunc) http.HandlerFunc { return } - storeUrl := req.Header.Get(constants.InternalStoreUrl) + storeUrl := req.Header.Get(protocol.InternalStoreUrl) if storeUrl == "" { storeUrl = req.URL.String() } diff --git a/plugin/registry.go b/plugin/registry.go index db0e247..f55af32 100644 --- a/plugin/registry.go +++ b/plugin/registry.go @@ -6,7 +6,7 @@ import ( configv1 "github.com/omalloc/tavern/api/defined/v1/plugin" "github.com/omalloc/tavern/contrib/log" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" ) type Registry interface { @@ -49,5 +49,5 @@ func NewRegistry() Registry { } func fmtName(name string) string { - return strings.ToLower(fmt.Sprintf("%s.plugin.%s", constants.AppName, name)) + return strings.ToLower(fmt.Sprintf("%s.plugin.%s", protocol.AppName, name)) } diff --git a/server/middleware/caching/caching.go b/server/middleware/caching/caching.go index 09d88c7..c4f2d49 100644 --- a/server/middleware/caching/caching.go +++ b/server/middleware/caching/caching.go @@ -18,7 +18,7 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" "github.com/omalloc/tavern/pkg/iobuf" "github.com/omalloc/tavern/pkg/iobuf/ioindexes" xhttp "github.com/omalloc/tavern/pkg/x/http" @@ -140,7 +140,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) { if resp != nil { // set cache-staus header BYPASS - resp.Header.Set(constants.ProtocolCacheStatusKey, BYPASS) + resp.Header.Set(protocol.ProtocolCacheStatusKey, BYPASS) } return } @@ -260,7 +260,7 @@ func (c *Caching) getUpstreamReader(fromByte, toByte uint64, async bool) (io.Rea // req.Header.Set("X-Request-ID", fmt.Sprintf("%s-%d", req.Header.Get(appctx.ProtocolRequestIDKey), fromByte)) // 附加 Request-ID suffix // remove all internal header - req.Header.Del(constants.ProtocolCacheStatusKey) + req.Header.Del(protocol.ProtocolCacheStatusKey) doSubRequest := func() (*http.Response, error) { now := time.Now() @@ -366,7 +366,7 @@ func (c *Caching) doProxy(req *http.Request, subRequest bool) (*http.Response, e } // parsed cache-control header - expiredAt, cacheable := xhttp.ParseCacheTime(constants.CacheTime, resp.Header) + expiredAt, cacheable := xhttp.ParseCacheTime(protocol.CacheTime, resp.Header) // expire time c.md.ExpiresAt = now.Add(expiredAt).Unix() @@ -391,7 +391,7 @@ func (c *Caching) doProxy(req *http.Request, subRequest bool) (*http.Response, e // Caching is disabled // restoring the default behavior for error codes. - if resp.Header.Get(constants.InternalCacheErrCode) != constants.FlagOn { + if resp.Header.Get(protocol.InternalCacheErrCode) != protocol.FlagOn { c.cacheable = false copiedHeaders := make(http.Header) @@ -469,7 +469,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc // trigger file crc check // has InMemory store type skip crc check - if c.bucket.StoreType() == storage.TypeInMemory { + if c.bucket.Type() == storage.TypeInMemory { return } diff --git a/server/middleware/caching/caching_fillrange.go b/server/middleware/caching/caching_fillrange.go index 1680c8d..445cb80 100644 --- a/server/middleware/caching/caching_fillrange.go +++ b/server/middleware/caching/caching_fillrange.go @@ -7,7 +7,7 @@ import ( "net/http" "strconv" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" "github.com/omalloc/tavern/pkg/iobuf" xhttp "github.com/omalloc/tavern/pkg/x/http" ) @@ -189,7 +189,7 @@ func (f *fillRange) fill(c *Caching, req *http.Request, rawRange string) *http.R } func parseFillPercent(h http.Header, def uint64) uint64 { - fp := h.Get(constants.InternalFillRangePercent) + fp := h.Get(protocol.InternalFillRangePercent) if fp != "" { p, err := strconv.ParseUint(fp, 10, 64) if err != nil { diff --git a/server/middleware/caching/caching_prefetch.go b/server/middleware/caching/caching_prefetch.go index 629d981..fbf6164 100644 --- a/server/middleware/caching/caching_prefetch.go +++ b/server/middleware/caching/caching_prefetch.go @@ -6,7 +6,7 @@ import ( "net/http" "strconv" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" "github.com/omalloc/tavern/pkg/iobuf" xhttp "github.com/omalloc/tavern/pkg/x/http" ) @@ -33,13 +33,13 @@ func (r *PrefetchProcessor) Lookup(c *Caching, req *http.Request) (bool, error) } func (r *PrefetchProcessor) PreRequest(c *Caching, req *http.Request) (*http.Request, error) { - if key := req.Header.Get(constants.PrefetchCacheKey); key != "" { + if key := req.Header.Get(protocol.PrefetchCacheKey); key != "" { if rawRange := req.Header.Get("Range"); rawRange != "" { req.Header.Del("Range") req = req.WithContext(context.WithValue(req.Context(), prefetchRangeKey{}, rawRange)) } c.prefetch = true - req.Header.Del(constants.PrefetchCacheKey) + req.Header.Del(protocol.PrefetchCacheKey) c.log.Debugf("prefetch request: %s %s", req.Method, req.URL.String()) } return req, nil diff --git a/server/middleware/caching/internal.go b/server/middleware/caching/internal.go index 604dd9c..3364256 100644 --- a/server/middleware/caching/internal.go +++ b/server/middleware/caching/internal.go @@ -21,7 +21,7 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" "github.com/omalloc/tavern/metrics" "github.com/omalloc/tavern/pkg/iobuf" xhttp "github.com/omalloc/tavern/pkg/x/http" @@ -92,15 +92,15 @@ func (c *Caching) setXCache(resp *http.Response) { return } - resp.Header.Set(constants.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, "(tavern/4.0)"}, " ")) + resp.Header.Set(protocol.ProtocolCacheStatusKey, strings.Join([]string{c.cacheStatus.String(), "from", c.opt.Hostname, c.bucket.StoreType(), "(tavern/4.0)"}, " ")) metric := metrics.FromContext(c.req.Context()) metric.CacheStatus = c.cacheStatus.String() // debug header - if trace := c.req.Header.Get(constants.InternalTraceKey); trace != "" { - resp.Header.Set(constants.InternalStoreUrl, strconv.FormatInt(int64(c.cacheStatus), 10)) - resp.Header.Set(constants.InternalSwapfile, c.id.WPath(c.bucket.Path())) + if trace := c.req.Header.Get(protocol.InternalTraceKey); trace != "" { + resp.Header.Set(protocol.InternalStoreUrl, strconv.FormatInt(int64(c.cacheStatus), 10)) + resp.Header.Set(protocol.InternalSwapfile, c.id.WPath(c.bucket.Path())) } } @@ -302,7 +302,7 @@ func cloneRequest(req *http.Request) *http.Request { xhttp.RemoveHopByHopHeaders(proxyReq.Header) // custom upstream addr - if upsAddr := req.Header.Get(constants.InternalUpstreamAddr); upsAddr != "" { + if upsAddr := req.Header.Get(protocol.InternalUpstreamAddr); upsAddr != "" { proxyReq = proxyReq.WithContext( selector.NewPeerContext(context.Background(), selector.NewPeer(selector.NewNode("http", upsAddr, map[string]string{}))), ) diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index bbae62f..3fe91c3 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -372,7 +372,7 @@ func (d *diskBucket) Store(ctx context.Context, meta *object.Metadata) error { return nil } -func (d *diskBucket) touch(ctx context.Context, id *object.ID) { +func (d *diskBucket) touch(_ context.Context, id *object.ID) { mark := d.cache.Get(id.Hash()) if mark == nil { return @@ -397,7 +397,8 @@ func (d *diskBucket) touch(ctx context.Context, id *object.ID) { d.promMu.Unlock() d.hkPromote.Add(id.Bytes()) - if d.hkPromote.Query(id.Bytes()) >= uint32(d.opt.Migration.Promote.MinHits) { + hits := d.hkPromote.Query(id.Bytes()) + if hits >= uint32(d.opt.Migration.Promote.MinHits) { go func() { // check migration interface if d.migration != nil { diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index 39039f1..7d870a6 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -46,9 +46,9 @@ func New(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, mb := &memoryBucket{ fs: vfs.NewMem(), path: "/", - driver: opt.Driver, + driver: opt.Driver, // storage.TypeInMemory dbPath: storage.TypeInMemory, - storeType: storage.TypeInMemory, + storeType: opt.Type, weight: 100, // default weight sharedkv: sharedkv, cache: lru.New[object.IDHash, storage.Mark](opt.MaxObjectLimit), // in-memory object size diff --git a/storage/builder.go b/storage/builder.go index 8c93ae5..25dcf0d 100644 --- a/storage/builder.go +++ b/storage/builder.go @@ -27,7 +27,7 @@ type globalBucketOption struct { var bucketMap = map[string]func(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, error){ "empty": empty.New, "native": disk.New, // disk is an alias of native - "memory": memory.New, // in-memory disk. restart as lost. + "memory": memory.New, // in-memory disk. restart as lost. @ storage.TypeInMemory } func NewBucket(opt *storage.BucketConfig, sharedkv storage.SharedKV) (storage.Bucket, error) { @@ -57,6 +57,10 @@ func mergeConfig(global *globalBucketOption, bucket *conf.Bucket) *storage.Bucke if copied.Type == "" { copied.Type = storage.TypeWarm } + // replace normal to warm + if copied.Type == storage.TypeNormal { + copied.Type = storage.TypeWarm + } if copied.DBType == "" { copied.DBType = global.DBType } diff --git a/tests/all-features/errcode/errcode_test.go b/tests/all-features/errcode/errcode_test.go index 113c689..a833feb 100644 --- a/tests/all-features/errcode/errcode_test.go +++ b/tests/all-features/errcode/errcode_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/omalloc/tavern/internal/constants" + "github.com/omalloc/tavern/internal/protocol" "github.com/omalloc/tavern/pkg/e2e" ) @@ -67,8 +67,8 @@ func TestErrCodeCache(t *testing.T) { case1 := e2e.New("http://example.com.gslb.com/errcode/cache", e2e.RespCallback(func(w http.ResponseWriter, r *http.Request) { payload := []byte(http.StatusText(http.StatusBadGateway)) - w.Header().Set(constants.CacheTime, "30") // 强制缓存30秒 - w.Header().Set(constants.InternalCacheErrCode, "1") // 开启缓存错误状态码 + w.Header().Set(protocol.CacheTime, "30") // 强制缓存30秒 + w.Header().Set(protocol.InternalCacheErrCode, "1") // 开启缓存错误状态码 w.Header().Set("Content-Length", strconv.Itoa(len(payload))) w.WriteHeader(http.StatusBadGateway) _, _ = w.Write(payload) From 4e14617c7b1dd282e467b938b097eac49528473b Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Thu, 12 Feb 2026 21:14:32 +0800 Subject: [PATCH 2/2] fix: (Breaking Change) rename `normal` storage type to `warm` --- storage/bucket/disk/disk_migration_test.go | 4 ++-- storage/bucket/disk/disk_test.go | 2 +- storage/storage.go | 18 +++++++++--------- storage/storage_test.go | 5 +++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/storage/bucket/disk/disk_migration_test.go b/storage/bucket/disk/disk_migration_test.go index a551bb8..8e029ab 100644 --- a/storage/bucket/disk/disk_migration_test.go +++ b/storage/bucket/disk/disk_migration_test.go @@ -40,7 +40,7 @@ func TestMigration_Promote(t *testing.T) { DBPath: filepath.Join(basepath, ".indexdb"), DBType: "pebble", Driver: "native", - Type: "normal", + Type: storage.TypeWarm, Migration: &storage.MigrationConfig{ Enabled: true, Promote: storage.PromoteConfig{ @@ -97,7 +97,7 @@ func TestMigration_Demote(t *testing.T) { DBPath: filepath.Join(basepath, ".indexdb"), DBType: "pebble", Driver: "native", - Type: "normal", + Type: storage.TypeWarm, Migration: &storage.MigrationConfig{ Enabled: true, Demote: storage.DemoteConfig{ diff --git a/storage/bucket/disk/disk_test.go b/storage/bucket/disk/disk_test.go index 6d7e166..4be4cc4 100644 --- a/storage/bucket/disk/disk_test.go +++ b/storage/bucket/disk/disk_test.go @@ -20,7 +20,7 @@ func newTestBucket(t *testing.T, basepath string) storagev1.Bucket { bucket, err := disk.New(&storagev1.BucketConfig{ Path: basepath, Driver: "native", - Type: "normal", + Type: storagev1.TypeWarm, DBType: "pebble", DBPath: path.Join(basepath, ".indexdb"), AsyncLoad: false, diff --git a/storage/storage.go b/storage/storage.go index 9d487d4..fc36a54 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -31,7 +31,7 @@ type nativeStorage struct { nopBucket storage.Bucket memoryBucket storage.Bucket hotBucket []storage.Bucket - normalBucket []storage.Bucket + warmlBucket []storage.Bucket } func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { @@ -54,7 +54,7 @@ func New(config *conf.Storage, logger log.Logger) (storage.Storage, error) { nopBucket: nopBucket, memoryBucket: nil, hotBucket: make([]storage.Bucket, 0, len(config.Buckets)), - normalBucket: make([]storage.Bucket, 0, len(config.Buckets)), + warmlBucket: make([]storage.Bucket, 0, len(config.Buckets)), } if err := n.reinit(config); err != nil { @@ -102,8 +102,8 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { } switch bucket.StoreType() { - case storage.TypeNormal: - n.normalBucket = append(n.normalBucket, bucket) + case storage.TypeNormal, storage.TypeWarm: + n.warmlBucket = append(n.warmlBucket, bucket) case storage.TypeHot: n.hotBucket = append(n.hotBucket, bucket) case storage.TypeInMemory: @@ -120,14 +120,14 @@ func (n *nativeStorage) reinit(config *conf.Storage) error { // load lru // load purge queue - if len(n.normalBucket) <= 0 { + if len(n.warmlBucket) <= 0 { // no normal bucket, use nop bucket if n.memoryBucket != nil { - n.normalBucket = append(n.normalBucket, n.memoryBucket) + n.warmlBucket = append(n.warmlBucket, n.memoryBucket) } } - n.selector = selector.New(n.normalBucket, config.SelectionPolicy) + n.selector = selector.New(n.warmlBucket, config.SelectionPolicy) return nil } @@ -145,7 +145,7 @@ func (n *nativeStorage) Rebuild(ctx context.Context, buckets []storage.Bucket) e // Buckets implements storage.Storage. func (n *nativeStorage) Buckets() []storage.Bucket { - return append(n.normalBucket, n.hotBucket...) + return append(n.warmlBucket, n.hotBucket...) } // PURGE implements storage.Storage. @@ -248,7 +248,7 @@ func (n *nativeStorage) SharedKV() storage.SharedKV { func (n *nativeStorage) Close() error { var errs []error // close all buckets - for _, bucket := range n.normalBucket { + for _, bucket := range n.warmlBucket { errs = append(errs, bucket.Close()) } diff --git a/storage/storage_test.go b/storage/storage_test.go index f513e9d..d024f49 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "testing" + storagev1 "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/conf" "github.com/omalloc/tavern/contrib/log" @@ -24,8 +25,8 @@ func TestSelect(t *testing.T) { EvictionPolicy: "lru", SelectionPolicy: "hashring", Buckets: []*conf.Bucket{ - {Path: filepath.Join(dir, "/cache1"), Type: "normal"}, - {Path: filepath.Join(dir, "/cache2"), Type: "normal"}, + {Path: filepath.Join(dir, "/cache1"), Type: storagev1.TypeWarm}, + {Path: filepath.Join(dir, "/cache2"), Type: storagev1.TypeWarm}, }, }, log.DefaultLogger) if err != nil {