From a0647f9461de8c62844b4885c6333d0a47632ed5 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Wed, 4 Feb 2026 13:20:42 +0800 Subject: [PATCH 1/6] feat(plugin): implement request smoothing and add metrics endpoint --- metrics/metrics.go | 65 +++++++++++++++++++++++ plugin/qs/qs.go | 127 +++++++++++++++++++++++++++++++++++++++------ server/server.go | 1 + 3 files changed, 178 insertions(+), 15 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 1abe097..487982f 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1 +1,66 @@ package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +type CounterSmoother struct { + lastValue float64 + smoothed float64 + Alpha float64 + isInit bool +} + +func (s *CounterSmoother) Update(currentTotal float64) float64 { + if !s.isInit { + s.lastValue = currentTotal + s.isInit = true + return 0 + } + + delta := currentTotal - s.lastValue + if delta < 0 { + delta = 0 + } + + s.smoothed = s.Alpha*delta + (1-s.Alpha)*s.smoothed + s.lastValue = currentTotal + + return s.smoothed +} + +type RequestsCodeTotal struct { + Code string `json:"code"` + Count float64 `json:"count"` +} + +func CollectorRequestsCodeTotal() []*RequestsCodeTotal { + totals := make([]*RequestsCodeTotal, 0) + if mfs := Gather(); mfs != nil { + + for _, mf := range mfs { + if mf.GetName() == "tr_tavern_requests_code_total" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.Label { + if label.GetName() == "code" { + totals = append(totals, &RequestsCodeTotal{ + Code: label.GetValue(), + Count: metric.GetCounter().GetValue(), + }) + } + } + } + } + } + } + return totals +} + +func Gather() []*dto.MetricFamily { + familys, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return nil + } + return familys +} diff --git a/plugin/qs/qs.go b/plugin/qs/qs.go index 1862f23..a9c9307 100644 --- a/plugin/qs/qs.go +++ b/plugin/qs/qs.go @@ -7,6 +7,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/goccy/go-json" @@ -14,8 +15,10 @@ import ( configv1 "github.com/omalloc/tavern/api/defined/v1/plugin" "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" + "github.com/omalloc/tavern/metrics" "github.com/omalloc/tavern/plugin" "github.com/omalloc/tavern/storage" + "github.com/prometheus/client_golang/prometheus" ) var _ configv1.Plugin = (*QsPlugin)(nil) @@ -40,6 +43,10 @@ type option struct { type QsPlugin struct { log *log.Helper opt *option + + mu sync.RWMutex + stopCh chan struct{} + smoothedData map[string]float64 // 存储平滑后的状态码指标 } func init() { @@ -52,18 +59,20 @@ func NewQsPlugin(opts configv1.Option, log *log.Helper) (configv1.Plugin, error) return nil, err } return &QsPlugin{ - log: log, - opt: opt, + log: log, + opt: opt, + stopCh: make(chan struct{}, 1), + smoothedData: make(map[string]float64), }, nil } // HandleFunc implements plugin.Plugin. -func (e *QsPlugin) HandleFunc(next http.HandlerFunc) http.HandlerFunc { +func (qs *QsPlugin) HandleFunc(next http.HandlerFunc) http.HandlerFunc { return next } // AddRouter implements plugin.Plugin. -func (e *QsPlugin) AddRouter(router *http.ServeMux) { +func (qs *QsPlugin) AddRouter(router *http.ServeMux) { router.Handle("/plugin/store/disk", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { buckets := storage.Current().Buckets() @@ -143,32 +152,120 @@ func (e *QsPlugin) AddRouter(router *http.ServeMux) { w.WriteHeader(http.StatusOK) _, _ = w.Write(buf) - return + })) + + router.Handle("/plugin/qs/graph", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + qs.mu.RLock() + data := make(map[string]float64, len(qs.smoothedData)) + for code, smoothedValue := range qs.smoothedData { + switch code { + case "total": + data["total"] = smoothedValue + case "200", "206": + data["2xx"] += smoothedValue + case "400", "401", "403", "404": + data["4xx"] += smoothedValue + case "499": + data["499"] += smoothedValue + case "500", "502", "503", "504": + data["5xx"] += smoothedValue + } + } + qs.mu.RUnlock() + + buf, err := json.Marshal(data) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(buf) })) } // Start implements plugin.Plugin. -func (e *QsPlugin) Start(context.Context) error { +func (qs *QsPlugin) Start(context.Context) error { // you can add your startup logic here - // e.g. - // - // go func() { - // // do something - // }() + // start the ticker to collect requests per second metrics ( TODO: with enabled qs/graph endpoint ) + go qs.tickRequestsPerSecond() + return nil } // Stop implements plugin.Plugin. -func (e *QsPlugin) Stop(context.Context) error { +func (qs *QsPlugin) Stop(context.Context) error { // you can add your cleanup logic here - // e.g. - // - // stopCh <- struct{}{} + qs.stopCh <- struct{}{} + return nil } +// tickRequestsPerSecond periodically collects and smooths the requests per second metrics. +func (qs *QsPlugin) tickRequestsPerSecond() { + metricsMap := map[string]*metrics.CounterSmoother{ + "200": {Alpha: 0.3}, + "206": {Alpha: 0.3}, + "400": {Alpha: 0.3}, + "401": {Alpha: 0.3}, + "403": {Alpha: 0.3}, + "404": {Alpha: 0.3}, + "499": {Alpha: 0.3}, + "500": {Alpha: 0.3}, + "502": {Alpha: 0.3}, + "503": {Alpha: 0.3}, + "504": {Alpha: 0.3}, + } + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-qs.stopCh: + return + case <-ticker.C: + familys, err := prometheus.DefaultGatherer.Gather() + if err != nil { + continue + } + + // 临时存储本次收集的平滑值 + tempData := make(map[string]float64) + totalCounter := float64(0) + for _, mf := range familys { + if mf.GetName() == "tr_tavern_requests_code_total" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.Label { + if label.GetName() == "code" { + code := label.GetValue() + val := metric.GetCounter().GetValue() + totalCounter += val + if smoother, ok := metricsMap[code]; ok { + smoothedValue := smoother.Update(val) + tempData[code] = smoothedValue + } + } + } + } + } + } + + // 使用写锁更新共享数据 + qs.mu.Lock() + for code, value := range tempData { + qs.smoothedData[code] = value + } + qs.smoothedData["total"] = totalCounter + qs.mu.Unlock() + } + } +} + func convRange(parts bitmap.Bitmap) string { nums := make([]int, 0, parts.Count()) parts.Range(func(x uint32) { diff --git a/server/server.go b/server/server.go index 7866489..59bccc4 100644 --- a/server/server.go +++ b/server/server.go @@ -106,6 +106,7 @@ func NewServer(flip *tableflip.Upgrader, config *conf.Bootstrap, plugins []plugi host := fmtAddr(r.Host) if _, ok := localMatcher[host]; ok { // 内部接口处理流程 + w.Header().Set("X-Server", "local-plugin") mux.ServeHTTP(w, r) return } From c870c09fa228af79b78782a44e3b532522c05e83 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Wed, 4 Feb 2026 14:21:28 +0800 Subject: [PATCH 2/6] feat(metrics): add CPU and memory usage metrics to QsPlugin --- cmd/top/main.go | 234 ++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 15 +++- go.sum | 30 +++++++ plugin/qs/qs.go | 38 +++++++- 4 files changed, 315 insertions(+), 2 deletions(-) create mode 100644 cmd/top/main.go diff --git a/cmd/top/main.go b/cmd/top/main.go new file mode 100644 index 0000000..853585b --- /dev/null +++ b/cmd/top/main.go @@ -0,0 +1,234 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "sync/atomic" + "time" + + "github.com/dustin/go-humanize" + terminal "github.com/gizak/termui/v3" + "github.com/gizak/termui/v3/widgets" +) + +var ( + endpoint = "" + tickInterval = time.Second * 1 + startAt = time.Now() + uptime = time.Unix(1767060001, 0) +) + +func init() { + flag.StringVar(&endpoint, "endpoint", "http://localhost:8080/plugin/qs/graph", "The metrics endpoint to fetch data from tavern server.") + flag.DurationVar(&tickInterval, "interval", time.Second*1, "The interval to fetch metrics.") +} + +func main() { + flag.Parse() + + newDashboard() +} + +func newDashboard() { + if err := terminal.Init(); err != nil { + log.Fatalf("failed to initialize termui: %v", err) + } + defer terminal.Close() + + collected := atomic.Bool{} + cpuPercent := atomic.Uint32{} + memUsage := atomic.Uint64{} + memTotal := atomic.Uint64{} + diskPercent := atomic.Uint64{} // mock + + client := &http.Client{ + Timeout: time.Second, + Transport: &http.Transport{}, + } + + graph, _ := http.NewRequest(http.MethodGet, endpoint, nil) + + fetch := func() map[string]float64 { + resp, err := client.Do(graph) + if err != nil { + collected.Store(false) + return nil + } + + if resp != nil { + if resp.StatusCode != http.StatusOK { + if resp.Body != nil { + _ = resp.Body.Close() + } + collected.Store(false) + return nil + } + } + + collected.Store(true) + var data map[string]float64 + json.NewDecoder(resp.Body).Decode(&data) + + cpuPercent.Store(uint32(data["cpu_percent"])) + memUsage.Store(uint64(data["mem_usage"])) + memTotal.Store(uint64(data["mem_total"])) + return data + } + + termWidth, _ := terminal.TerminalDimensions() + + // 基础监控指标 { qps, cpu, memory } + metricGrid := terminal.NewGrid() + metricGrid.SetRect(0, 3, termWidth, 20) + + banner, bannerDraw := func() (*widgets.Paragraph, func()) { + banner := widgets.NewParagraph() + banner.SetRect(0, 0, termWidth, 3) + banner.Title = "Tavern" + banner.Border = true + + textDraw := func() { + color := "fg:red" + status := "Disconnected" + if collected.Load() { + color = "fg:green" + status = "Connected" + } + + banner.Text = fmt.Sprintf("%s | Sampling @ [%s](fg:blue) | [%s](%s) (%s) | Uptime %s", + endpoint, tickInterval.String(), status, color, startAt.Format(time.RFC1123), humanize.Time(uptime)) + } + textDraw() + + return banner, textDraw + }() + + rater, raterDraw := func() (*widgets.Paragraph, func()) { + rater := widgets.NewParagraph() + rater.Title = "Requests" + rater.SetRect(0, 3, 50, 6) + rater.BorderStyle.Fg = terminal.ColorWhite + rater.TitleStyle.Fg = terminal.ColorCyan + + draw := func() { + data := fetch() + + rater.Text = fmt.Sprintf("\nRequests/sec: %d \nTotal: %d \n2xx : %d\n4xx : %d\n499 : %d\n5xx : %d", + int(data["total"]), int(data["total"]), int(data["2xx"]), int(data["4xx"]), int(data["499"]), int(data["5xx"])) + } + + draw() + return rater, draw + }() + + load, loadDraw := func() (*widgets.Gauge, func()) { + load := widgets.NewGauge() + load.Title = "CPU Usage" + load.Percent = int(cpuPercent.Load()) + load.BarColor = terminal.ColorMagenta + load.BorderStyle.Fg = terminal.ColorWhite + load.TitleStyle.Fg = terminal.ColorCyan + + return load, func() { + load.Percent = int(cpuPercent.Load()) + 10 + } + }() + + mem, memDraw := func() (*widgets.Gauge, func()) { + mem := widgets.NewGauge() + mem.Title = "Memory Usage" + usagePercent := 0 + if memTotal.Load() > 0 { + usagePercent = int(float64(memUsage.Load()) / float64(memTotal.Load()) * 100) + } + mem.Percent = usagePercent + mem.BarColor = terminal.ColorGreen + mem.BorderStyle.Fg = terminal.ColorWhite + mem.TitleStyle.Fg = terminal.ColorCyan + + return mem, func() { + usagePercent := 0 + if memTotal.Load() > 0 { + usagePercent = int(float64(memUsage.Load()) / float64(memTotal.Load()) * 100) + } + mem.Percent = usagePercent + mem.Label = fmt.Sprintf("%d%% | Mem: %s / %s", + usagePercent, + humanize.Bytes(memUsage.Load()), + humanize.Bytes(memTotal.Load()), + ) + } + }() + + disk, diskDraw := func() (*widgets.Gauge, func()) { + disk := widgets.NewGauge() + disk.Title = "Disk Usage" + disk.Percent = int(diskPercent.Load()) + 70 + disk.BarColor = terminal.ColorYellow + disk.BorderStyle.Fg = terminal.ColorWhite + disk.TitleStyle.Fg = terminal.ColorCyan + + return disk, func() { + disk.Percent = int(diskPercent.Load()) + 70 + } + }() + + metricGrid.Set( + terminal.NewRow(1.0/2, + terminal.NewCol(1.0/2, rater), + terminal.NewCol(1.0/2, + terminal.NewRow(1.0/3, load), + terminal.NewRow(1.0/3, mem), + terminal.NewRow(1.0/3, disk), + ), + ), + ) + + // 高级监控指标 { 热点url 热点域名 热点磁盘 } + list := widgets.NewList() + list.Title = "Hot URLs" + list.SetRect(0, 12, termWidth, 30) + list.BorderStyle.Fg = terminal.ColorWhite + list.TitleStyle.Fg = terminal.ColorCyan + list.TextStyle.Fg = terminal.ColorYellow + + terminal.Render(banner, metricGrid, list) + + uiEvents := terminal.PollEvents() + ticker := time.NewTicker(time.Second).C + for { + select { + case e := <-uiEvents: + switch e.ID { + case "q", "": + return + } + + switch e.Type { + case terminal.ResizeEvent: + payload := e.Payload.(terminal.Resize) + termWidth = payload.Width + // termHeight = payload.Height + + banner.SetRect(0, 0, termWidth, 3) + metricGrid.SetRect(0, 3, termWidth, 20) + list.SetRect(0, 12, termWidth, 30) + + terminal.Clear() + terminal.Render(banner, metricGrid, list) + } + + case <-ticker: + bannerDraw() + raterDraw() + memDraw() + diskDraw() + loadDraw() + + terminal.Render(banner, metricGrid, list) + } + } +} diff --git a/go.mod b/go.mod index d0d9d19..a3225de 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/cloudflare/tableflip v1.2.3 github.com/cockroachdb/pebble/v2 v2.1.2 + github.com/dustin/go-humanize v1.0.1 github.com/fsnotify/fsnotify v1.9.0 github.com/fxamacker/cbor/v2 v2.9.0 + github.com/gizak/termui/v3 v3.1.0 github.com/go-viper/mapstructure/v2 v2.4.0 github.com/goccy/go-json v0.10.5 github.com/google/uuid v1.6.0 @@ -19,6 +21,8 @@ require ( github.com/omalloc/proxy v0.0.0-20251201151440-9054f8002a97 github.com/paulbellamy/ratecounter v0.2.0 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 + github.com/shirou/gopsutil/v4 v4.26.1 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 golang.org/x/sync v0.18.0 @@ -42,8 +46,10 @@ require ( github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/ebitengine/purego v0.9.1 // indirect github.com/edsrzf/mmap-go v1.2.0 // indirect github.com/getsentry/sentry-go v0.40.0 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v1.0.0 // indirect @@ -52,18 +58,25 @@ require ( github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/mattn/go-runewidth v0.0.9 // indirect github.com/minio/minlz v1.0.1 // indirect + github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.6.2 // indirect + github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.19.2 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/btree v1.8.1 // indirect + github.com/tklauser/go-sysconf v0.3.16 // indirect + github.com/tklauser/numcpus v0.11.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.10.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect diff --git a/go.sum b/go.sum index faba2fa..677e177 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,10 @@ github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb/go.mod h1: github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/ebitengine/purego v0.9.1 h1:a/k2f2HQU3Pi399RPW1MOaZyhKJL9w/xFpKAg4q1s0A= +github.com/ebitengine/purego v0.9.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/edsrzf/mmap-go v1.2.0 h1:hXLYlkbaPzt1SaQk+anYwKSRNhufIDCchSPkUD6dD84= github.com/edsrzf/mmap-go v1.2.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= @@ -53,8 +57,12 @@ github.com/getsentry/sentry-go v0.40.0 h1:VTJMN9zbTvqDqPwheRVLcp0qcUcM+8eFivvGoc github.com/getsentry/sentry-go v0.40.0/go.mod h1:eRXCoh3uvmjQLY6qu63BjUZnaBu5L5WhMV1RwYO8W5s= github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM= github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs= +github.com/gizak/termui/v3 v3.1.0 h1:ZZmVDgwHl7gR7elfKf1xc4IudXZ5qqfDh4wExk4Iajc= +github.com/gizak/termui/v3 v3.1.0/go.mod h1:bXQEBkJpzxUAKf0+xq9MSWAvWZlE7c+aidmyFlkYTrY= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= @@ -65,6 +73,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -85,12 +94,21 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/maniartech/signals v1.3.1 h1:pT3dK6x5Un+B6L3ZLAKygEe+L49TClPreyT08vOoHXY= github.com/maniartech/signals v1.3.1/go.mod h1:AbE8Yy9ZjKCWNU/VhQ+0Ea9KOaTWHp6aOfdLBe5m1iM= +github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/minio/minlz v1.0.1 h1:OUZUzXcib8diiX+JYxyRLIdomyZYzHct6EShOKtQY2A= github.com/minio/minlz v1.0.1/go.mod h1:qT0aEB35q79LLornSzeDH75LBf3aH1MV+jB5w9Wasec= +github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzCv8LZP15IdmG+YdwD2luVPHITV96TkirNBM= +github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d h1:x3S6kxmy49zXVVyhcnrFqxvNVCBPb2KZ9hV2RBdS840= +github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d/go.mod h1:IuKpRQcYE1Tfu+oAQqaLisqDeXgjyyltCfsaoYN18NQ= github.com/nutsdb/nutsdb v1.1.0 h1:fNGFzBHGqF2mB5BF8Qk8W94c3/ZzwdCdKAH7azwx70Y= github.com/nutsdb/nutsdb v1.1.0/go.mod h1:aKCtgSprZf2Mp1dIQD00Iya3DttoTErSSOnRx5ZtpAs= github.com/omalloc/proxy v0.0.0-20251201151440-9054f8002a97 h1:Uq3uQx04GXkw3RJ3FawXJmmYcoPWIQTQHOI3ozWWEwM= @@ -104,6 +122,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= @@ -115,12 +135,18 @@ github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05Zp github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/shirou/gopsutil/v4 v4.26.1 h1:TOkEyriIXk2HX9d4isZJtbjXbEjf5qyKPAzbzY0JWSo= +github.com/shirou/gopsutil/v4 v4.26.1/go.mod h1:medLI9/UNAb0dOI9Q3/7yWSqKkj00u+1tgY8nvv41pc= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA= github.com/tidwall/btree v1.8.1/go.mod h1:jBbTdUWhSZClZWoDg54VnvV7/54modSOzDN7VXftj1A= +github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA= +github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI= +github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw= +github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 h1:w0si+uee0iAaCJO9q86T6yrhdadgcsoNuh47LrUykzg= @@ -129,6 +155,8 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= @@ -155,7 +183,9 @@ golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= diff --git a/plugin/qs/qs.go b/plugin/qs/qs.go index a9c9307..5044d19 100644 --- a/plugin/qs/qs.go +++ b/plugin/qs/qs.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/goccy/go-json" @@ -19,6 +20,8 @@ import ( "github.com/omalloc/tavern/plugin" "github.com/omalloc/tavern/storage" "github.com/prometheus/client_golang/prometheus" + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/mem" ) var _ configv1.Plugin = (*QsPlugin)(nil) @@ -47,6 +50,9 @@ type QsPlugin struct { mu sync.RWMutex stopCh chan struct{} smoothedData map[string]float64 // 存储平滑后的状态码指标 + cpuPercent atomic.Uint32 + memUsage atomic.Uint64 + memTotal atomic.Uint64 } func init() { @@ -63,6 +69,9 @@ func NewQsPlugin(opts configv1.Option, log *log.Helper) (configv1.Plugin, error) opt: opt, stopCh: make(chan struct{}, 1), smoothedData: make(map[string]float64), + cpuPercent: atomic.Uint32{}, + memUsage: atomic.Uint64{}, + memTotal: atomic.Uint64{}, }, nil } @@ -155,8 +164,9 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { })) router.Handle("/plugin/qs/graph", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data := make(map[string]float64, len(qs.smoothedData)+3) + qs.mu.RLock() - data := make(map[string]float64, len(qs.smoothedData)) for code, smoothedValue := range qs.smoothedData { switch code { case "total": @@ -173,6 +183,10 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { } qs.mu.RUnlock() + data["cpu_percent"] = float64(qs.cpuPercent.Load()) + data["mem_usage"] = float64(qs.memUsage.Load()) + data["mem_total"] = float64(qs.memTotal.Load()) + buf, err := json.Marshal(data) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -192,6 +206,8 @@ func (qs *QsPlugin) Start(context.Context) error { // start the ticker to collect requests per second metrics ( TODO: with enabled qs/graph endpoint ) go qs.tickRequestsPerSecond() + // start the ticker to collect CPU and memory usage metrics + go qs.tickUsage() return nil } @@ -266,6 +282,26 @@ func (qs *QsPlugin) tickRequestsPerSecond() { } } +func (qs *QsPlugin) tickUsage() { + cpu.Percent(time.Second, true) + + for { + select { + case <-qs.stopCh: + return + case <-time.Tick(time.Second): + percent, err := cpu.Percent(0, false) + if err == nil && len(percent) > 0 { + qs.cpuPercent.Store(uint32(percent[0])) + } + + vmem, _ := mem.VirtualMemory() + qs.memUsage.Store(vmem.Used) + qs.memTotal.Store(vmem.Total) + } + } +} + func convRange(parts bitmap.Bitmap) string { nums := make([]int, 0, parts.Count()) parts.Range(func(x uint32) { From 81ce1ceb28bd91e31b11ce9bd9e409c70e7984f4 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Wed, 4 Feb 2026 23:17:27 +0800 Subject: [PATCH 3/6] feat: implement TopK functionality for LRU cache and storage buckets, integrating it into the QS plugin for hot URL tracking and on-demand metric collection. --- api/defined/v1/storage/storage.go | 2 + cmd/top/main.go | 54 +++++---- pkg/algorithm/lru/lru.go | 36 +++++- pkg/algorithm/lru/lru_topk_test.go | 66 +++++++++++ plugin/qs/qs.go | 172 ++++++++++++++++++++++++----- storage/bucket/disk/disk.go | 9 ++ storage/bucket/empty/empty.go | 4 + storage/bucket/memory/memory.go | 11 ++ storage/diraware/bucket.go | 4 + 9 files changed, 310 insertions(+), 48 deletions(-) create mode 100644 pkg/algorithm/lru/lru_topk_test.go diff --git a/api/defined/v1/storage/storage.go b/api/defined/v1/storage/storage.go index b407e0b..66287d3 100644 --- a/api/defined/v1/storage/storage.go +++ b/api/defined/v1/storage/storage.go @@ -99,6 +99,8 @@ type Bucket interface { StoreType() string // Path returns the Bucket path. Path() string + // TopK returns the top k most frequently used keys + TopK(k int) []string } type PurgeControl struct { diff --git a/cmd/top/main.go b/cmd/top/main.go index 853585b..9eac916 100644 --- a/cmd/top/main.go +++ b/cmd/top/main.go @@ -38,11 +38,23 @@ func newDashboard() { } defer terminal.Close() + termWidth, _ := terminal.TerminalDimensions() + collected := atomic.Bool{} cpuPercent := atomic.Uint32{} memUsage := atomic.Uint64{} memTotal := atomic.Uint64{} diskPercent := atomic.Uint64{} // mock + diskUsage := atomic.Uint64{} + diskTotal := atomic.Uint64{} + + // 高级监控指标 { 热点url 热点域名 热点磁盘 } + list := widgets.NewList() + list.Title = "Hot URLs" + list.SetRect(0, 12, termWidth, 30) + list.BorderStyle.Fg = terminal.ColorWhite + list.TitleStyle.Fg = terminal.ColorCyan + list.TextStyle.Fg = terminal.ColorYellow client := &http.Client{ Timeout: time.Second, @@ -69,16 +81,18 @@ func newDashboard() { } collected.Store(true) - var data map[string]float64 - json.NewDecoder(resp.Body).Decode(&data) + var rsp Graph + _ = json.NewDecoder(resp.Body).Decode(&rsp) - cpuPercent.Store(uint32(data["cpu_percent"])) - memUsage.Store(uint64(data["mem_usage"])) - memTotal.Store(uint64(data["mem_total"])) - return data - } + cpuPercent.Store(uint32(rsp.Data["cpu_percent"])) + memUsage.Store(uint64(rsp.Data["mem_usage"])) + memTotal.Store(uint64(rsp.Data["mem_total"])) + diskUsage.Store(uint64(rsp.Data["disk_usage"])) + diskTotal.Store(uint64(rsp.Data["disk_total"])) + list.Rows = rsp.HotUrls - termWidth, _ := terminal.TerminalDimensions() + return rsp.Data + } // 基础监控指标 { qps, cpu, memory } metricGrid := terminal.NewGrid() @@ -133,7 +147,7 @@ func newDashboard() { load.TitleStyle.Fg = terminal.ColorCyan return load, func() { - load.Percent = int(cpuPercent.Load()) + 10 + load.Percent = int(cpuPercent.Load()) } }() @@ -166,13 +180,18 @@ func newDashboard() { disk, diskDraw := func() (*widgets.Gauge, func()) { disk := widgets.NewGauge() disk.Title = "Disk Usage" - disk.Percent = int(diskPercent.Load()) + 70 + disk.Percent = int(diskPercent.Load()) disk.BarColor = terminal.ColorYellow disk.BorderStyle.Fg = terminal.ColorWhite disk.TitleStyle.Fg = terminal.ColorCyan return disk, func() { - disk.Percent = int(diskPercent.Load()) + 70 + disk.Percent = int(diskPercent.Load()) + disk.Label = fmt.Sprintf("%d%% | Disk: %s / %s", + 0, + humanize.Bytes(diskUsage.Load()), + humanize.Bytes(diskTotal.Load()), + ) } }() @@ -187,14 +206,6 @@ func newDashboard() { ), ) - // 高级监控指标 { 热点url 热点域名 热点磁盘 } - list := widgets.NewList() - list.Title = "Hot URLs" - list.SetRect(0, 12, termWidth, 30) - list.BorderStyle.Fg = terminal.ColorWhite - list.TitleStyle.Fg = terminal.ColorCyan - list.TextStyle.Fg = terminal.ColorYellow - terminal.Render(banner, metricGrid, list) uiEvents := terminal.PollEvents() @@ -232,3 +243,8 @@ func newDashboard() { } } } + +type Graph struct { + Data map[string]float64 `json:"data"` + HotUrls []string `json:"hot_urls"` +} diff --git a/pkg/algorithm/lru/lru.go b/pkg/algorithm/lru/lru.go index 73fb9cf..7244cd5 100644 --- a/pkg/algorithm/lru/lru.go +++ b/pkg/algorithm/lru/lru.go @@ -136,6 +136,31 @@ func (c *Cache[K, V]) Keys() []K { return keys } +// TopK returns the top k most frequently used keys +func (c *Cache[K, V]) TopK(k int) []K { + c.mu.RLock() + defer c.mu.RUnlock() + + keys := make([]K, 0, k) + // Iterate from highest frequency (Back) to lowest (Front) + for freqNode := c.freqs.Back(); freqNode != nil; freqNode = freqNode.Prev() { + li := freqNode.Value.(*listEntry[K, V]) + // Inside bucket, most recently added/promoted are at Back? + // increment() uses PushBack. + // So Back are the newest in this frequency. + // We iterate from Back to Front to resolve ties with recency if needed, + // or just simply collect them. + for entryNode := li.entries.Back(); entryNode != nil; entryNode = entryNode.Prev() { + entry := entryNode.Value.(*cacheEntry[K, V]) + keys = append(keys, entry.key) + if len(keys) >= k { + return keys + } + } + } + return keys +} + // Remove removes the given key from the cache. func (c *Cache[K, V]) Remove(key K) bool { c.mu.Lock() @@ -211,6 +236,11 @@ func (c *Cache[K, V]) increment(e *cacheEntry[K, V]) { nextPlace = currentPlace.Next() } + var oldEntryNode *list.Element[*cacheEntry[K, V]] + if currentPlace != nil { + oldEntryNode = e.entryNode + } + if nextPlace == nil || nextPlace.Value.(*listEntry[K, V]).freq != nextFreq { // create a new list entry li := new(listEntry[K, V]) @@ -226,7 +256,11 @@ func (c *Cache[K, V]) increment(e *cacheEntry[K, V]) { e.entryNode = nextPlace.Value.(*listEntry[K, V]).entries.PushBack(e) if currentPlace != nil { // remove from current position - c.remEntry(currentPlace, e) + li := currentPlace.Value.(*listEntry[K, V]) + li.entries.Remove(oldEntryNode) + if li.entries.Len() == 0 { + c.freqs.Remove(currentPlace) + } } } diff --git a/pkg/algorithm/lru/lru_topk_test.go b/pkg/algorithm/lru/lru_topk_test.go new file mode 100644 index 0000000..e200884 --- /dev/null +++ b/pkg/algorithm/lru/lru_topk_test.go @@ -0,0 +1,66 @@ +package lru + +import ( + "testing" +) + +func TestCache_TopK(t *testing.T) { + c := New[string, int](10) + + // Add items + c.Set("a", 1) // freq 1 + c.Set("b", 1) // freq 1 + c.Set("c", 1) // freq 1 + c.Set("d", 1) // freq 1 + + // Increase frequency + c.Get("a") // a: freq 2 + c.Get("a") // a: freq 3 + + c.Get("b") // b: freq 2 + + // Current state: + // a: 3 + // b: 2 + // c: 1 + // d: 1 + + // Test TopK(2) should return [a, b] + top2 := c.TopK(2) + if len(top2) != 2 { + t.Fatalf("expected len 2, got %d", len(top2)) + } + if top2[0] != "a" { + t.Errorf("expected top[0] to be 'a', got %s", top2[0]) + } + if top2[1] != "b" { + t.Errorf("expected top[1] to be 'b', got %s", top2[1]) + } + + // Increase c to be top + c.Get("c") + c.Get("c") + c.Get("c") + // c: 4 -> New Top 1 + + // Test TopK(3) + top3 := c.TopK(3) + if len(top3) != 3 { + t.Fatalf("expected len 3, got %d", len(top3)) + } + if top3[0] != "c" { // c(4) + t.Errorf("expected top[0] to be 'c', got %s", top3[0]) + } + if top3[1] != "a" { // a(3) + t.Errorf("expected top[1] to be 'a', got %s", top3[1]) + } + if top3[2] != "b" { // b(2) + t.Errorf("expected top[2] to be 'b', got %s", top3[2]) + } + + // Test TopK > Len + top10 := c.TopK(10) + if len(top10) != 4 { + t.Errorf("expected len 4, got %d", len(top10)) + } +} diff --git a/plugin/qs/qs.go b/plugin/qs/qs.go index 5044d19..56afdf7 100644 --- a/plugin/qs/qs.go +++ b/plugin/qs/qs.go @@ -14,6 +14,7 @@ import ( "github.com/goccy/go-json" "github.com/kelindar/bitmap" configv1 "github.com/omalloc/tavern/api/defined/v1/plugin" + storagev1 "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" "github.com/omalloc/tavern/metrics" @@ -21,11 +22,17 @@ import ( "github.com/omalloc/tavern/storage" "github.com/prometheus/client_golang/prometheus" "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" "github.com/shirou/gopsutil/v4/mem" ) var _ configv1.Plugin = (*QsPlugin)(nil) +type Graph struct { + Data map[string]float64 `json:"data"` + HotUrls []string `json:"hot_urls"` +} + type SimpleMetadata struct { ID string `json:"id"` Chunks string `json:"chunks,omitempty"` @@ -48,11 +55,18 @@ type QsPlugin struct { opt *option mu sync.RWMutex + ctrlMu sync.Mutex + collect atomic.Bool + lastReq atomic.Int64 + cancel context.CancelFunc stopCh chan struct{} smoothedData map[string]float64 // 存储平滑后的状态码指标 + hotUrls []string cpuPercent atomic.Uint32 memUsage atomic.Uint64 memTotal atomic.Uint64 + diskUsage atomic.Uint64 + diskTotal atomic.Uint64 } func init() { @@ -68,6 +82,7 @@ func NewQsPlugin(opts configv1.Option, log *log.Helper) (configv1.Plugin, error) log: log, opt: opt, stopCh: make(chan struct{}, 1), + collect: atomic.Bool{}, smoothedData: make(map[string]float64), cpuPercent: atomic.Uint32{}, memUsage: atomic.Uint64{}, @@ -82,6 +97,7 @@ func (qs *QsPlugin) HandleFunc(next http.HandlerFunc) http.HandlerFunc { // AddRouter implements plugin.Plugin. func (qs *QsPlugin) AddRouter(router *http.ServeMux) { + router.Handle("/plugin/store/disk", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { buckets := storage.Current().Buckets() @@ -164,30 +180,21 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { })) router.Handle("/plugin/qs/graph", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - data := make(map[string]float64, len(qs.smoothedData)+3) + qs.touchOrStart() + + data := qs.collectRequestsCode() qs.mu.RLock() - for code, smoothedValue := range qs.smoothedData { - switch code { - case "total": - data["total"] = smoothedValue - case "200", "206": - data["2xx"] += smoothedValue - case "400", "401", "403", "404": - data["4xx"] += smoothedValue - case "499": - data["499"] += smoothedValue - case "500", "502", "503", "504": - data["5xx"] += smoothedValue - } - } + hotUrls := make([]string, len(qs.hotUrls)) + copy(hotUrls, qs.hotUrls) qs.mu.RUnlock() - data["cpu_percent"] = float64(qs.cpuPercent.Load()) - data["mem_usage"] = float64(qs.memUsage.Load()) - data["mem_total"] = float64(qs.memTotal.Load()) + g := Graph{ + Data: data, + HotUrls: hotUrls, + } - buf, err := json.Marshal(data) + buf, err := json.Marshal(g) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -203,12 +210,6 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { // Start implements plugin.Plugin. func (qs *QsPlugin) Start(context.Context) error { // you can add your startup logic here - - // start the ticker to collect requests per second metrics ( TODO: with enabled qs/graph endpoint ) - go qs.tickRequestsPerSecond() - // start the ticker to collect CPU and memory usage metrics - go qs.tickUsage() - return nil } @@ -221,8 +222,65 @@ func (qs *QsPlugin) Stop(context.Context) error { return nil } +// touchOrStart starts the collectors if not running, or updates the last request time. +func (qs *QsPlugin) touchOrStart() { + qs.lastReq.Store(time.Now().UnixNano()) + if qs.collect.Load() { + return + } + + qs.ctrlMu.Lock() + defer qs.ctrlMu.Unlock() + + if qs.collect.Load() { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + qs.cancel = cancel + qs.collect.Store(true) + + // start the ticker to collect requests per second metrics ( TODO: with enabled qs/graph endpoint ) + go qs.tickRequestsPerSecond(ctx) + // start the ticker to collect CPU and memory usage metrics + go qs.tickUsage(ctx) + // start the monitor to stop the collectors if no requests for a while + go qs.tickMonitor(ctx) +} + +func (qs *QsPlugin) tickMonitor(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-qs.stopCh: + return + case <-ctx.Done(): + return + case <-ticker.C: + last := qs.lastReq.Load() + if time.Since(time.Unix(0, last)) > 5*time.Second { + qs.ctrlMu.Lock() + // double check + last = qs.lastReq.Load() + if time.Since(time.Unix(0, last)) > 5*time.Second { + if qs.cancel != nil { + qs.cancel() + qs.cancel = nil + } + qs.collect.Store(false) + qs.ctrlMu.Unlock() + return + } + qs.ctrlMu.Unlock() + } + } + } +} + // tickRequestsPerSecond periodically collects and smooths the requests per second metrics. -func (qs *QsPlugin) tickRequestsPerSecond() { +func (qs *QsPlugin) tickRequestsPerSecond(ctx context.Context) { metricsMap := map[string]*metrics.CounterSmoother{ "200": {Alpha: 0.3}, "206": {Alpha: 0.3}, @@ -242,9 +300,12 @@ func (qs *QsPlugin) tickRequestsPerSecond() { for { select { + case <-ctx.Done(): + return case <-qs.stopCh: return case <-ticker.C: + log.Info("qs tickRequestsPerSecond") familys, err := prometheus.DefaultGatherer.Gather() if err != nil { continue @@ -282,14 +343,17 @@ func (qs *QsPlugin) tickRequestsPerSecond() { } } -func (qs *QsPlugin) tickUsage() { - cpu.Percent(time.Second, true) +func (qs *QsPlugin) tickUsage(ctx context.Context) { + buckets := storage.Current().Buckets() for { select { + case <-ctx.Done(): + return case <-qs.stopCh: return case <-time.Tick(time.Second): + log.Info("qs tickUsage") percent, err := cpu.Percent(0, false) if err == nil && len(percent) > 0 { qs.cpuPercent.Store(uint32(percent[0])) @@ -298,10 +362,62 @@ func (qs *QsPlugin) tickUsage() { vmem, _ := mem.VirtualMemory() qs.memUsage.Store(vmem.Used) qs.memTotal.Store(vmem.Total) + + diskUsage := uint64(0) + diskTotal := uint64(0) + hotkeys := make([]string, 0, len(buckets)*10) + for _, bucket := range buckets { + if bucket.StoreType() == storagev1.TypeInMemory { + continue + } + + usage, _ := disk.Usage(bucket.Path()) + diskUsage += usage.Used + diskTotal += usage.Total + + hotkeys = append(hotkeys, bucket.TopK(10)...) + } + + qs.diskUsage.Store(diskUsage) + qs.diskTotal.Store(diskTotal) + + qs.mu.Lock() + qs.hotUrls = hotkeys + qs.mu.Unlock() } } } +func (qs *QsPlugin) collectRequestsCode() map[string]float64 { + data := make(map[string]float64, len(qs.smoothedData)+5) + + qs.mu.RLock() + defer qs.mu.RUnlock() + + for code, smoothedValue := range qs.smoothedData { + switch code { + case "total": + data["total"] = smoothedValue + case "200", "206": + data["2xx"] += smoothedValue + case "400", "401", "403", "404": + data["4xx"] += smoothedValue + case "499": + data["499"] += smoothedValue + case "500", "502", "503", "504": + data["5xx"] += smoothedValue + } + } + + data["cpu_percent"] = float64(qs.cpuPercent.Load()) + data["mem_usage"] = float64(qs.memUsage.Load()) + data["mem_total"] = float64(qs.memTotal.Load()) + data["disk_usage"] = float64(qs.diskUsage.Load()) + data["disk_total"] = float64(qs.diskTotal.Load()) + + return data +} + func convRange(parts bitmap.Bitmap) string { nums := make([]int, 0, parts.Count()) parts.Range(func(x uint32) { diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index 3fe91c3..fff69ab 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -456,6 +456,15 @@ func (d *diskBucket) Path() string { return d.path } +func (d *diskBucket) TopK(k int) []string { + arr := d.cache.TopK(k) + ret := make([]string, len(arr)) + for i := range arr { + ret[i] = hex.EncodeToString(arr[i][:]) + } + return ret +} + func (d *diskBucket) WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error) { wpath := id.WPathSlice(d.path, index) _ = os.MkdirAll(filepath.Dir(wpath), d.fileMode) diff --git a/storage/bucket/empty/empty.go b/storage/bucket/empty/empty.go index 8e73c27..1218208 100644 --- a/storage/bucket/empty/empty.go +++ b/storage/bucket/empty/empty.go @@ -117,6 +117,10 @@ func (e *emptyBucket) Path() string { return e.path } +func (e *emptyBucket) TopK(k int) []string { + return nil +} + func (e *emptyBucket) WriteChunkFile(ctx context.Context, id *object.ID, index uint32) (io.WriteCloser, string, error) { return nil, "/dev/null", os.ErrNotExist } diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index 7d870a6..d422618 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -2,6 +2,7 @@ package memory import ( "context" + "encoding/hex" "errors" "fmt" "io" @@ -223,6 +224,16 @@ func (m *memoryBucket) Path() string { return m.path } +// TopK implements [storage.Bucket]. +func (m *memoryBucket) TopK(k int) []string { + arr := m.cache.TopK(k) + ret := make([]string, len(arr)) + for i := range arr { + ret[i] = hex.EncodeToString(arr[i][:]) + } + return ret +} + // Remove implements [storage.Bucket]. func (m *memoryBucket) Remove(ctx context.Context, id *object.ID) error { return nil diff --git a/storage/diraware/bucket.go b/storage/diraware/bucket.go index e0f9cf5..d1e1f57 100644 --- a/storage/diraware/bucket.go +++ b/storage/diraware/bucket.go @@ -139,6 +139,10 @@ func (b *wrappedBucket) Path() string { return b.base.Path() } +func (b *wrappedBucket) TopK(k int) []string { + return b.base.TopK(k) +} + func (b *wrappedBucket) Close() error { return b.base.Close() } From 899d8a5629387cb1fdafde7a684f462e6d473eba Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Thu, 12 Feb 2026 23:22:54 +0800 Subject: [PATCH 4/6] feat: implement real-time monitoring for QS plugin via SSE and introduce `ttop` command with enhanced hot key details. --- Makefile | 6 +- cmd/top/main.go | 116 ++++++++++++++++++-------- go.mod | 1 + go.sum | 4 + main.go | 9 +++ pkg/x/runtime/info.go | 12 +++ plugin/qs/qs.go | 157 +++++++++++++++++++++++++----------- storage/bucket/disk/disk.go | 6 +- 8 files changed, 226 insertions(+), 85 deletions(-) diff --git a/Makefile b/Makefile index 51d5b74..b4e5bd4 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,9 @@ endif LDFLAGS=-ldflags "-w -s -extldflags=-static" default: - make clean - make build + @make clean + @make build + @make toolchain .PHONY: install install: @@ -31,6 +32,7 @@ build: .PHONY: toolchain toolchain: @env CGO_ENABLED=0 go build ${LDFLAGS} -o bin/tq cmd/tq/main.go + @env CGO_ENABLED=0 go build ${LDFLAGS} -o bin/ttop cmd/top/main.go .PHONY: run run: diff --git a/cmd/top/main.go b/cmd/top/main.go index 9eac916..75527a8 100644 --- a/cmd/top/main.go +++ b/cmd/top/main.go @@ -1,24 +1,26 @@ package main import ( + "bufio" "encoding/json" "flag" "fmt" "log" "net/http" + "strings" + "sync" "sync/atomic" "time" "github.com/dustin/go-humanize" terminal "github.com/gizak/termui/v3" "github.com/gizak/termui/v3/widgets" + "github.com/samber/lo" ) var ( endpoint = "" tickInterval = time.Second * 1 - startAt = time.Now() - uptime = time.Unix(1767060001, 0) ) func init() { @@ -47,6 +49,7 @@ func newDashboard() { diskPercent := atomic.Uint64{} // mock diskUsage := atomic.Uint64{} diskTotal := atomic.Uint64{} + startedAt := atomic.Int64{} // 高级监控指标 { 热点url 热点域名 热点磁盘 } list := widgets.NewList() @@ -57,42 +60,70 @@ func newDashboard() { list.TextStyle.Fg = terminal.ColorYellow client := &http.Client{ - Timeout: time.Second, Transport: &http.Transport{}, } - graph, _ := http.NewRequest(http.MethodGet, endpoint, nil) - - fetch := func() map[string]float64 { - resp, err := client.Do(graph) - if err != nil { - collected.Store(false) - return nil - } + var ( + dataMu sync.RWMutex + latestData = make(map[string]float64) + latestHotUrls []string + ) - if resp != nil { - if resp.StatusCode != http.StatusOK { - if resp.Body != nil { - _ = resp.Body.Close() + // Background SSE consumer + go func() { + for { + func() { + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return } - collected.Store(false) - return nil - } - } - collected.Store(true) - var rsp Graph - _ = json.NewDecoder(resp.Body).Decode(&rsp) + resp, err := client.Do(req) + if err != nil { + collected.Store(false) + return + } + defer resp.Body.Close() - cpuPercent.Store(uint32(rsp.Data["cpu_percent"])) - memUsage.Store(uint64(rsp.Data["mem_usage"])) - memTotal.Store(uint64(rsp.Data["mem_total"])) - diskUsage.Store(uint64(rsp.Data["disk_usage"])) - diskTotal.Store(uint64(rsp.Data["disk_total"])) - list.Rows = rsp.HotUrls + if resp.StatusCode != http.StatusOK { + collected.Store(false) + return + } - return rsp.Data - } + collected.Store(true) + reader := bufio.NewReader(resp.Body) + for { + line, err := reader.ReadString('\n') + if err != nil { + return + } + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "data:") { + jsonStr := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if jsonStr == "" { + continue + } + + var rsp Graph + if err := json.Unmarshal([]byte(jsonStr), &rsp); err == nil { + dataMu.Lock() + latestData = rsp.Data + latestHotUrls = rsp.HotUrls + dataMu.Unlock() + + startedAt.Store(rsp.StartedAt) + cpuPercent.Store(uint32(rsp.Data["cpu_percent"])) + memUsage.Store(uint64(rsp.Data["mem_usage"])) + memTotal.Store(uint64(rsp.Data["mem_total"])) + diskUsage.Store(uint64(rsp.Data["disk_usage"])) + diskTotal.Store(uint64(rsp.Data["disk_total"])) + } + } + } + }() + time.Sleep(time.Second) // Reconnect delay + } + }() // 基础监控指标 { qps, cpu, memory } metricGrid := terminal.NewGrid() @@ -101,7 +132,7 @@ func newDashboard() { banner, bannerDraw := func() (*widgets.Paragraph, func()) { banner := widgets.NewParagraph() banner.SetRect(0, 0, termWidth, 3) - banner.Title = "Tavern" + banner.Title = " Tavern (PRESS q TO QUIT) " banner.Border = true textDraw := func() { @@ -112,8 +143,10 @@ func newDashboard() { status = "Connected" } + startAt := time.UnixMilli(startedAt.Load()) + banner.Text = fmt.Sprintf("%s | Sampling @ [%s](fg:blue) | [%s](%s) (%s) | Uptime %s", - endpoint, tickInterval.String(), status, color, startAt.Format(time.RFC1123), humanize.Time(uptime)) + endpoint, tickInterval.String(), status, color, startAt.Format(time.RFC1123), humanize.Time(startAt)) } textDraw() @@ -128,10 +161,22 @@ func newDashboard() { rater.TitleStyle.Fg = terminal.ColorCyan draw := func() { - data := fetch() + dataMu.RLock() + data := make(map[string]float64, len(latestData)) + for k, v := range latestData { + data[k] = v + } + hotUrls := make([]string, len(latestHotUrls)) + copy(hotUrls, latestHotUrls) + dataMu.RUnlock() rater.Text = fmt.Sprintf("\nRequests/sec: %d \nTotal: %d \n2xx : %d\n4xx : %d\n499 : %d\n5xx : %d", int(data["total"]), int(data["total"]), int(data["2xx"]), int(data["4xx"]), int(data["499"]), int(data["5xx"])) + + list.Rows = lo.Map(hotUrls, func(s string, i int) string { + parts := strings.Split(s, "@@") + return fmt.Sprintf("[%02d] LastAccess=%s %s ReqCount=%s", i, parts[1], parts[0], parts[2]) + }) } draw() @@ -245,6 +290,7 @@ func newDashboard() { } type Graph struct { - Data map[string]float64 `json:"data"` - HotUrls []string `json:"hot_urls"` + Data map[string]float64 `json:"data"` + HotUrls []string `json:"hot_urls"` + StartedAt int64 `json:"started_at"` } diff --git a/go.mod b/go.mod index a3225de..7590a53 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/paulbellamy/ratecounter v0.2.0 github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 + github.com/samber/lo v1.52.0 github.com/shirou/gopsutil/v4 v4.26.1 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 diff --git a/go.sum b/go.sum index 677e177..3d7fb6a 100644 --- a/go.sum +++ b/go.sum @@ -139,6 +139,10 @@ github.com/shirou/gopsutil/v4 v4.26.1 h1:TOkEyriIXk2HX9d4isZJtbjXbEjf5qyKPAzbzY0 github.com/shirou/gopsutil/v4 v4.26.1/go.mod h1:medLI9/UNAb0dOI9Q3/7yWSqKkj00u+1tgY8nvv41pc= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= +github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= +github.com/shirou/gopsutil/v4 v4.26.1 h1:TOkEyriIXk2HX9d4isZJtbjXbEjf5qyKPAzbzY0JWSo= +github.com/shirou/gopsutil/v4 v4.26.1/go.mod h1:medLI9/UNAb0dOI9Q3/7yWSqKkj00u+1tgY8nvv41pc= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA= diff --git a/main.go b/main.go index 4d2ac65..c33177b 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "fmt" stdlog "log" "net/url" "os" @@ -46,12 +47,15 @@ var ( flagConf string = "config.yaml" // flagVerbose is the verbose flag. flagVerbose bool + // flagVersion is the version flag. + flagVersion bool ) func init() { // init flag flag.StringVar(&flagConf, "c", "config.yaml", "config file path") flag.BoolVar(&flagVerbose, "v", false, "enable verbose log") + flag.BoolVar(&flagVersion, "V", false, "show version info") // init global encoding encoding.SetDefaultCodec(json.JSONCodec{}) @@ -71,6 +75,11 @@ func init() { func main() { flag.Parse() + if flagVersion { + fmt.Println(runtime.BuildInfo.String()) + return + } + c := config.New[conf.Bootstrap](config.WithSource(file.NewSource(flagConf))) defer c.Close() diff --git a/pkg/x/runtime/info.go b/pkg/x/runtime/info.go index a8f5261..03a00a3 100644 --- a/pkg/x/runtime/info.go +++ b/pkg/x/runtime/info.go @@ -1,9 +1,11 @@ package runtime import ( + "fmt" "runtime" "runtime/debug" "strings" + "time" ) type RuntimeInfo struct { @@ -14,6 +16,7 @@ type RuntimeInfo struct { VcsRevision string `json:"vcs.revision"` VcsTime string `json:"vcs.time"` Dirty bool `json:"dirty"` + StartedAt int64 `json:"started_at"` } var _ = "" @@ -23,6 +26,7 @@ func init() { BuildInfo.Dirty = true BuildInfo.GoVersion = runtime.Version() BuildInfo.GoArch = runtime.GOARCH + BuildInfo.StartedAt = time.Now().UnixMilli() // -buildvcs=true / auto if info, ok := debug.ReadBuildInfo(); ok { @@ -43,3 +47,11 @@ func init() { } } } + +func (info RuntimeInfo) String() string { + return fmt.Sprintf(`Version: %s +Commit: %s +Built at: %s +Dirty: %t`, + info.GoVersion, info.VcsRevision, info.VcsTime, info.Dirty) +} diff --git a/plugin/qs/qs.go b/plugin/qs/qs.go index 56afdf7..1ca387a 100644 --- a/plugin/qs/qs.go +++ b/plugin/qs/qs.go @@ -18,6 +18,7 @@ import ( "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" "github.com/omalloc/tavern/metrics" + "github.com/omalloc/tavern/pkg/x/runtime" "github.com/omalloc/tavern/plugin" "github.com/omalloc/tavern/storage" "github.com/prometheus/client_golang/prometheus" @@ -29,8 +30,9 @@ import ( var _ configv1.Plugin = (*QsPlugin)(nil) type Graph struct { - Data map[string]float64 `json:"data"` - HotUrls []string `json:"hot_urls"` + Data map[string]float64 `json:"data"` + HotUrls []string `json:"hot_urls"` + StartedAt int64 `json:"started_at"` } type SimpleMetadata struct { @@ -180,30 +182,61 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { })) router.Handle("/plugin/qs/graph", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Set headers for SSE + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + // check if client is still connected + if r.Context().Err() != nil { + return + } + + // send initial data qs.touchOrStart() - data := qs.collectRequestsCode() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - qs.mu.RLock() - hotUrls := make([]string, len(qs.hotUrls)) - copy(hotUrls, qs.hotUrls) - qs.mu.RUnlock() + for { + select { + case <-r.Context().Done(): + return + case <-ticker.C: + qs.touchOrStart() - g := Graph{ - Data: data, - HotUrls: hotUrls, - } + data := qs.collectRequestsCode() - buf, err := json.Marshal(g) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + qs.mu.RLock() + hotUrls := make([]string, len(qs.hotUrls)) + copy(hotUrls, qs.hotUrls) + qs.mu.RUnlock() - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Content-Length", strconv.Itoa(len(buf))) - w.WriteHeader(http.StatusOK) - _, _ = w.Write(buf) + g := Graph{ + Data: data, + StartedAt: runtime.BuildInfo.StartedAt, + HotUrls: hotUrls, + } + + buf, err := json.Marshal(g) + if err != nil { + continue + } + + _, err = fmt.Fprintf(w, "data: %s\n\n", buf) + if err != nil { + return + } + flusher.Flush() + } + } })) } @@ -244,6 +277,8 @@ func (qs *QsPlugin) touchOrStart() { go qs.tickRequestsPerSecond(ctx) // start the ticker to collect CPU and memory usage metrics go qs.tickUsage(ctx) + // start the ticker to collect hot keys metrics + go qs.tickHotKeys(ctx) // start the monitor to stop the collectors if no requests for a while go qs.tickMonitor(ctx) } @@ -305,7 +340,7 @@ func (qs *QsPlugin) tickRequestsPerSecond(ctx context.Context) { case <-qs.stopCh: return case <-ticker.C: - log.Info("qs tickRequestsPerSecond") + familys, err := prometheus.DefaultGatherer.Gather() if err != nil { continue @@ -344,7 +379,19 @@ func (qs *QsPlugin) tickRequestsPerSecond(ctx context.Context) { } func (qs *QsPlugin) tickUsage(ctx context.Context) { - buckets := storage.Current().Buckets() + collect := func() { + percent, err := cpu.Percent(0, false) + if err == nil && len(percent) > 0 { + qs.cpuPercent.Store(uint32(percent[0])) + } + + vmem, _ := mem.VirtualMemory() + qs.memUsage.Store(vmem.Used) + qs.memTotal.Store(vmem.Total) + } + + // collect once at the beginning + collect() for { select { @@ -352,38 +399,54 @@ func (qs *QsPlugin) tickUsage(ctx context.Context) { return case <-qs.stopCh: return - case <-time.Tick(time.Second): - log.Info("qs tickUsage") - percent, err := cpu.Percent(0, false) - if err == nil && len(percent) > 0 { - qs.cpuPercent.Store(uint32(percent[0])) - } + case <-time.Tick(time.Second * 2): + collect() + } + } +} - vmem, _ := mem.VirtualMemory() - qs.memUsage.Store(vmem.Used) - qs.memTotal.Store(vmem.Total) +func (qs *QsPlugin) tickHotKeys(ctx context.Context) { - diskUsage := uint64(0) - diskTotal := uint64(0) - hotkeys := make([]string, 0, len(buckets)*10) - for _, bucket := range buckets { - if bucket.StoreType() == storagev1.TypeInMemory { - continue - } + buckets := storage.Current().Buckets() - usage, _ := disk.Usage(bucket.Path()) - diskUsage += usage.Used - diskTotal += usage.Total + collectBucketMetrics := func() { + diskUsage := uint64(0) + diskTotal := uint64(0) - hotkeys = append(hotkeys, bucket.TopK(10)...) + hotkeys := make([]string, 0, len(buckets)*10) + for _, bucket := range buckets { + if bucket.StoreType() == storagev1.TypeInMemory { + continue } - qs.diskUsage.Store(diskUsage) - qs.diskTotal.Store(diskTotal) + usage, _ := disk.Usage(bucket.Path()) + diskUsage += usage.Used + diskTotal += usage.Total - qs.mu.Lock() - qs.hotUrls = hotkeys - qs.mu.Unlock() + keys := bucket.TopK(10) + + hotkeys = append(hotkeys, keys...) + } + + qs.diskUsage.Store(diskUsage) + qs.diskTotal.Store(diskTotal) + + qs.mu.Lock() + qs.hotUrls = hotkeys + qs.mu.Unlock() + } + + // collect once at the beginning + collectBucketMetrics() + + for { + select { + case <-ctx.Done(): + return + case <-qs.stopCh: + return + case <-time.Tick(time.Second * 5): + collectBucketMetrics() } } } diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index fff69ab..3b4332b 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -460,7 +460,11 @@ func (d *diskBucket) TopK(k int) []string { arr := d.cache.TopK(k) ret := make([]string, len(arr)) for i := range arr { - ret[i] = hex.EncodeToString(arr[i][:]) + mark := d.cache.Peek(arr[i]) + md, _ := d.indexdb.Get(context.Background(), arr[i][:]) + if md != nil { + ret[i] = fmt.Sprintf("%s@@%s@@%d", md.ID.Path(), time.Unix(int64(mark.LastAccess()), 0).Format(time.DateTime), mark.Refs()) + } } return ret } From 4c0fbce2886ccf09161c4167a784482d8de5ec17 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Thu, 12 Feb 2026 23:53:11 +0800 Subject: [PATCH 5/6] feat: enhance `TopK` output with object details and update `top` command display logic, and rename `qs` plugin API paths --- cmd/top/main.go | 20 ++++++++++++++++---- plugin/qs/qs.go | 6 +++--- storage/bucket/disk/disk.go | 4 ++-- storage/bucket/memory/memory.go | 9 ++++++--- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/cmd/top/main.go b/cmd/top/main.go index 75527a8..509a4a8 100644 --- a/cmd/top/main.go +++ b/cmd/top/main.go @@ -173,10 +173,7 @@ func newDashboard() { rater.Text = fmt.Sprintf("\nRequests/sec: %d \nTotal: %d \n2xx : %d\n4xx : %d\n499 : %d\n5xx : %d", int(data["total"]), int(data["total"]), int(data["2xx"]), int(data["4xx"]), int(data["499"]), int(data["5xx"])) - list.Rows = lo.Map(hotUrls, func(s string, i int) string { - parts := strings.Split(s, "@@") - return fmt.Sprintf("[%02d] LastAccess=%s %s ReqCount=%s", i, parts[1], parts[0], parts[2]) - }) + list.Rows = lo.Filter(lo.Map(hotUrls, toMap), filter) } draw() @@ -289,6 +286,21 @@ func newDashboard() { } } +func filter(s string, _ int) bool { + if s == "" { + return false + } + return true +} + +func toMap(s string, i int) string { + parts := strings.Split(s, "@@") + if len(parts) != 3 { + return "" + } + return fmt.Sprintf("[%02d] LastAccess=%s %s ReqCount=%s", i, parts[1], parts[0], parts[2]) +} + type Graph struct { Data map[string]float64 `json:"data"` HotUrls []string `json:"hot_urls"` diff --git a/plugin/qs/qs.go b/plugin/qs/qs.go index 1ca387a..3953a8e 100644 --- a/plugin/qs/qs.go +++ b/plugin/qs/qs.go @@ -100,7 +100,7 @@ func (qs *QsPlugin) HandleFunc(next http.HandlerFunc) http.HandlerFunc { // AddRouter implements plugin.Plugin. func (qs *QsPlugin) AddRouter(router *http.ServeMux) { - router.Handle("/plugin/store/disk", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + router.Handle("/plugin/qs/disk", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { buckets := storage.Current().Buckets() bucketObjectCounter := make(map[string]uint64, len(buckets)) @@ -115,7 +115,7 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { _, _ = w.Write(payload) })) - router.Handle("/plugin/store/object/simple", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + router.Handle("/plugin/qs/object/simple", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { getHash := r.URL.Query().Get("hash") != "" buckets := storage.Current().Buckets() @@ -157,7 +157,7 @@ func (qs *QsPlugin) AddRouter(router *http.ServeMux) { })) // get this device's service domains - router.Handle("/plugin/store/service-domains", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + router.Handle("/plugin/qs/service-domains", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { sharedKV := storage.Current().SharedKV() // type map[domain]counter domainMap := make(map[string]uint32) diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index 3b4332b..d8c7582 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -458,12 +458,12 @@ func (d *diskBucket) Path() string { func (d *diskBucket) TopK(k int) []string { arr := d.cache.TopK(k) - ret := make([]string, len(arr)) + ret := make([]string, 0, len(arr)) for i := range arr { mark := d.cache.Peek(arr[i]) md, _ := d.indexdb.Get(context.Background(), arr[i][:]) if md != nil { - ret[i] = fmt.Sprintf("%s@@%s@@%d", md.ID.Path(), time.Unix(int64(mark.LastAccess()), 0).Format(time.DateTime), mark.Refs()) + ret = append(ret, fmt.Sprintf("%s@@%s@@%d", md.ID.Path(), time.Unix(int64(mark.LastAccess()), 0).Format(time.DateTime), mark.Refs())) } } return ret diff --git a/storage/bucket/memory/memory.go b/storage/bucket/memory/memory.go index d422618..0a7f05a 100644 --- a/storage/bucket/memory/memory.go +++ b/storage/bucket/memory/memory.go @@ -2,7 +2,6 @@ package memory import ( "context" - "encoding/hex" "errors" "fmt" "io" @@ -227,9 +226,13 @@ func (m *memoryBucket) Path() string { // TopK implements [storage.Bucket]. func (m *memoryBucket) TopK(k int) []string { arr := m.cache.TopK(k) - ret := make([]string, len(arr)) + ret := make([]string, 0, len(arr)) for i := range arr { - ret[i] = hex.EncodeToString(arr[i][:]) + mark := m.cache.Peek(arr[i]) + md, _ := m.indexdb.Get(context.Background(), arr[i][:]) + if md != nil { + ret = append(ret, fmt.Sprintf("%s@@%s@@%d", md.ID.Path(), time.Unix(int64(mark.LastAccess()), 0).Format(time.DateTime), mark.Refs())) + } } return ret } From 970b020ebdd9c7968a1efb0df18015db59b599d9 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Fri, 13 Feb 2026 20:26:38 +0800 Subject: [PATCH 6/6] feat: Implement smoothed requests per second (RPS) calculation and display in the top command, and improve plugin shutdown logic. --- cmd/top/main.go | 2 +- plugin/qs/qs.go | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/cmd/top/main.go b/cmd/top/main.go index 509a4a8..50e49c3 100644 --- a/cmd/top/main.go +++ b/cmd/top/main.go @@ -171,7 +171,7 @@ func newDashboard() { dataMu.RUnlock() rater.Text = fmt.Sprintf("\nRequests/sec: %d \nTotal: %d \n2xx : %d\n4xx : %d\n499 : %d\n5xx : %d", - int(data["total"]), int(data["total"]), int(data["2xx"]), int(data["4xx"]), int(data["499"]), int(data["5xx"])) + int(data["rps"]), int(data["total"]), int(data["2xx"]), int(data["4xx"]), int(data["499"]), int(data["5xx"])) list.Rows = lo.Filter(lo.Map(hotUrls, toMap), filter) } diff --git a/plugin/qs/qs.go b/plugin/qs/qs.go index 3953a8e..6453aaa 100644 --- a/plugin/qs/qs.go +++ b/plugin/qs/qs.go @@ -252,6 +252,10 @@ func (qs *QsPlugin) Stop(context.Context) error { qs.stopCh <- struct{}{} + if qs.cancel != nil { + qs.cancel() + } + return nil } @@ -329,6 +333,8 @@ func (qs *QsPlugin) tickRequestsPerSecond(ctx context.Context) { "503": {Alpha: 0.3}, "504": {Alpha: 0.3}, } + totalSmoother := &metrics.CounterSmoother{Alpha: 0.3} + lastTotal := float64(0) ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -366,6 +372,12 @@ func (qs *QsPlugin) tickRequestsPerSecond(ctx context.Context) { } } } + rps := totalSmoother.Update(totalCounter) + if totalCounter <= lastTotal { + _ = totalSmoother.Update(0) + rps = 0 + } + lastTotal = totalCounter // 使用写锁更新共享数据 qs.mu.Lock() @@ -373,6 +385,7 @@ func (qs *QsPlugin) tickRequestsPerSecond(ctx context.Context) { qs.smoothedData[code] = value } qs.smoothedData["total"] = totalCounter + qs.smoothedData["rps"] = rps qs.mu.Unlock() } } @@ -439,13 +452,16 @@ func (qs *QsPlugin) tickHotKeys(ctx context.Context) { // collect once at the beginning collectBucketMetrics() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { select { case <-ctx.Done(): return case <-qs.stopCh: return - case <-time.Tick(time.Second * 5): + case <-ticker.C: collectBucketMetrics() } } @@ -461,6 +477,8 @@ func (qs *QsPlugin) collectRequestsCode() map[string]float64 { switch code { case "total": data["total"] = smoothedValue + case "rps": + data["rps"] = smoothedValue case "200", "206": data["2xx"] += smoothedValue case "400", "401", "403", "404":