diff --git a/.gitignore b/.gitignore index ac879b4..276438e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.vscode/ +go.sum bin -vendor prom2click diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index a996edb..0000000 --- a/.travis.yml +++ /dev/null @@ -1,9 +0,0 @@ -language: go - -go: - - 1.8.x - - master - -install: - - go get -v github.com/Masterminds/glide - - make get-deps \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..61b381c --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Package", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${fileDirname}" + } + ] +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7f6da39 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:latest AS build-env +ADD ./ /prom2click +WORKDIR /prom2click +RUN go mod tidy && CGO_ENABLED=0 go build -ldflags "-X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.VersionPrerelease=DEV" -o bin/prom2click + +FROM alpine +RUN apk add -U tzdata +RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime +COPY --from=build-env /prom2click/bin/prom2click /usr/local/bin/prom2click +RUN chmod +x /usr/local/bin/prom2click +CMD ["prom2click"] diff --git a/Makefile b/Makefile deleted file mode 100644 index 80e6a71..0000000 --- a/Makefile +++ /dev/null @@ -1,35 +0,0 @@ -.PHONY: build build-alpine clean test help default - -BIN_NAME=prom2click - -VERSION := $(shell grep "const Version " version.go | sed -E 's/.*"(.+)"$$/\1/') -GIT_COMMIT=$(shell git rev-parse HEAD) -GIT_DIRTY=$(shell test -n "`git status --porcelain`" && echo "+CHANGES" || true) -IMAGE_NAME := "s4z/prom2click" - -default: test - -help: - @echo 'Management commands for prom2click:' - @echo - @echo 'Usage:' - @echo ' make build Compile the project.' - @echo ' make get-deps runs glide install, mostly used for ci.' - - @echo ' make clean Clean the directory tree.' - @echo - -build: - @echo "building ${BIN_NAME} ${VERSION}" - @echo "GOPATH=${GOPATH}" - go build -ldflags "-X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.VersionPrerelease=DEV" -o bin/${BIN_NAME} - -get-deps: - glide install - -clean: - @test ! -e bin/${BIN_NAME} || rm bin/${BIN_NAME} - -test: - go test $(glide nv) - diff --git a/README.md b/README.md index b56d39c..c545601 100644 --- a/README.md +++ b/README.md @@ -115,11 +115,11 @@ Usage of ./bin/prom2click: ``` * Build prom2click and run it - * Install go and glide + * Install go 1.21 above ```console - $ make get-deps - $ make build + $ go mod tidy + $ go build -v -o bin/prom2click $ ./bin/prom2click ``` diff --git a/glide.lock b/glide.lock deleted file mode 100644 index e17fba9..0000000 --- a/glide.lock +++ /dev/null @@ -1,95 +0,0 @@ -hash: 2936591cb6ef55dc5517f96d731851b2135281bd2200919933219089b837e517 -updated: 2017-06-11T21:38:09.459666062+10:00 -imports: -- name: github.com/beorn7/perks - version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 - subpackages: - - quantile -- name: github.com/golang/protobuf - version: 5a0f697c9ed9d68fef0116532c6e05cfeae00e55 - subpackages: - - proto -- name: github.com/golang/snappy - version: 553a641470496b2327abcac10b36396bd98e45c9 -- name: github.com/kshvakov/clickhouse - version: 1250f4e0c94a7f0083e6a4e483bf3344787de0dc -- name: github.com/matttproud/golang_protobuf_extensions - version: c12348ce28de40eed0136aa2b644d0ee0650e56c - subpackages: - - pbutil -- name: github.com/opentracing/opentracing-go - version: eaaf4e1eeb7a5373b38e70901270c83577dc6fb9 - subpackages: - - log -- name: github.com/prometheus/client_golang - version: c5b7fccd204277076155f10851dad72b76a49317 - subpackages: - - prometheus -- name: github.com/prometheus/client_model - version: 6f3806018612930941127f2a7c6c453ba2c527d2 - subpackages: - - go -- name: github.com/prometheus/common - version: 13ba4ddd0caa9c28ca7b7bffe1dfa9ed8d5ef207 - subpackages: - - expfmt - - internal/bitbucket.org/ww/goautoneg - - log - - model -- name: github.com/prometheus/procfs - version: a3bfc74126ea9e45ee5d5c6f7fc86191b7d488fb - subpackages: - - xfs -- name: github.com/prometheus/prometheus - version: bfa37c8ee39d11078662dce16c162a61dccf616c - subpackages: - - config - - relabel - - storage - - storage/local - - storage/local/chunk - - storage/local/codable - - storage/local/index - - storage/metric - - storage/remote - - util/flock - - util/httputil - - util/testutil -- name: github.com/Sirupsen/logrus - version: 202f25545ea4cf9b191ff7f846df5d87c9382c2b -- name: github.com/syndtr/goleveldb - version: 8c81ea47d4c41a385645e133e15510fc6a2a74b4 - subpackages: - - leveldb - - leveldb/cache - - leveldb/comparer - - leveldb/errors - - leveldb/filter - - leveldb/iterator - - leveldb/journal - - leveldb/memdb - - leveldb/opt - - leveldb/storage - - leveldb/table - - leveldb/util -- name: golang.org/x/net - version: 1a68b1313cf4ad7778376e82641197b60c02f65c - subpackages: - - context - - context/ctxhttp -- name: golang.org/x/sys - version: 0b25a408a50076fbbcae6b7ac0ea5fbb0b085e79 - subpackages: - - unix - - windows - - windows/registry - - windows/svc/eventlog -- name: golang.org/x/time - version: 8be79e1e0910c292df4e79c241bb7e8f7e725959 - subpackages: - - rate -- name: gopkg.in/tylerb/graceful.v1 - version: 4654dfbb6ad53cb5e27f37d99b02e16c1872fbbb -- name: gopkg.in/yaml.v2 - version: cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b -testImports: [] diff --git a/glide.yaml b/glide.yaml deleted file mode 100644 index 4d790d8..0000000 --- a/glide.yaml +++ /dev/null @@ -1,20 +0,0 @@ -package: github.com/s4z/prom2click -import: -- package: github.com/golang/protobuf - subpackages: - - proto -- package: github.com/golang/snappy -- package: github.com/prometheus/client_golang - version: v0.8.0 - subpackages: - - prometheus -- package: github.com/prometheus/common - subpackages: - - log - - model -- package: github.com/prometheus/prometheus - subpackages: - - storage/remote -- package: gopkg.in/tylerb/graceful.v1 - version: v1.2.15 -- package: github.com/kshvakov/clickhouse diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7496a76 --- /dev/null +++ b/go.mod @@ -0,0 +1 @@ +module prom2click diff --git a/main.go b/main.go index 02e6b09..68a1610 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "time" + + "go.uber.org/zap" ) // a lot of this borrows directly from: @@ -28,11 +30,10 @@ type config struct { var ( versionFlag bool + debug bool ) func main() { - excode := 0 - conf := parseFlags() if versionFlag { @@ -41,27 +42,32 @@ func main() { if VersionPrerelease != "" { fmt.Println("Version PreRelease:", VersionPrerelease) } - os.Exit(excode) + return + } + + var logger *zap.Logger + if debug { + logger, _ = zap.NewDevelopment() + } else { + logger, _ = zap.NewProduction() } - fmt.Println("Starting up..") + defer logger.Sync() // flushes buffer, if any + sugar := logger.Sugar() - srv, err := NewP2CServer(conf) + sugar.Info("Starting up..") + + srv, err := NewP2CServer(conf, sugar) if err != nil { - fmt.Printf("Error: could not create server: %s\n", err.Error()) - excode = 1 - os.Exit(excode) + sugar.Fatalf("could not create server: %s\n", err.Error()) } err = srv.Start() if err != nil { - fmt.Printf("Error: http server returned error: %s\n", err.Error()) - excode = 1 + sugar.Fatalf("http server returned error: %s\n", err.Error()) } - - fmt.Println("Shutting down..") + sugar.Info("Shutting down..") srv.Shutdown() - fmt.Println("Exiting..") - os.Exit(excode) + sugar.Info("Exiting..") } func parseFlags() *config { @@ -70,6 +76,9 @@ func parseFlags() *config { // print version? flag.BoolVar(&versionFlag, "version", false, "Version") + // turn on debug? + flag.BoolVar(&debug, "debug", false, "turn on debug mode") + // clickhouse dsn ddsn := "tcp://127.0.0.1:9000?username=&password=&database=metrics&" + "read_timeout=10&write_timeout=10&alt_hosts=" @@ -91,12 +100,12 @@ func parseFlags() *config { ) // clickhouse insertion batch size - flag.IntVar(&cfg.ChBatch, "ch.batch", 8192, + flag.IntVar(&cfg.ChBatch, "ch.batch", 32768, "Clickhouse write batch size (n metrics).", ) // channel buffer size between http server => clickhouse writer(s) - flag.IntVar(&cfg.ChanSize, "ch.buffer", 8192, + flag.IntVar(&cfg.ChanSize, "ch.buffer", 32768, "Maximum internal channel buffer size (n requests).", ) diff --git a/reader.go b/reader.go index 2715132..436ff3f 100644 --- a/reader.go +++ b/reader.go @@ -7,17 +7,22 @@ import ( "fmt" "strings" + clickhouse "github.com/ClickHouse/clickhouse-go/v2" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/prompb" + "go.uber.org/zap" ) +var readerContent = []interface{}{"component", "reader"} + type p2cReader struct { - conf *config - db *sql.DB + conf *config + db *sql.DB + logger *zap.SugaredLogger } // getTimePeriod return select and where SQL chunks relating to the time period -or- error -func (r *p2cReader) getTimePeriod(query *remote.Query) (string, string, error) { +func (r *p2cReader) getTimePeriod(query *prompb.Query) (string, string, error) { var tselSQL = "SELECT COUNT() AS CNT, (intDiv(toUInt32(ts), %d) * %d) * 1000 as t" var twhereSQL = "WHERE date >= toDate(%d) AND ts >= toDateTime(%d) AND ts <= toDateTime(%d)" @@ -50,7 +55,7 @@ func (r *p2cReader) getTimePeriod(query *remote.Query) (string, string, error) { return selectSQL, whereSQL, nil } -func (r *p2cReader) getSQL(query *remote.Query) (string, error) { +func (r *p2cReader) getSQL(query *prompb.Query) (string, error) { // time related select sql, where sql chunks tselectSQL, twhereSQL, err := r.getTimePeriod(query) if err != nil { @@ -68,13 +73,13 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { if m.Name == model.MetricNameLabel { var whereAdd string switch m.Type { - case remote.MatchType_EQUAL: + case prompb.LabelMatcher_EQ: whereAdd = fmt.Sprintf(` name='%s' `, strings.Replace(m.Value, `'`, `\'`, -1)) - case remote.MatchType_NOT_EQUAL: + case prompb.LabelMatcher_NEQ: whereAdd = fmt.Sprintf(` name!='%s' `, strings.Replace(m.Value, `'`, `\'`, -1)) - case remote.MatchType_REGEX_MATCH: + case prompb.LabelMatcher_RE: whereAdd = fmt.Sprintf(` match(name, %s) = 1 `, strings.Replace(m.Value, `/`, `\/`, -1)) - case remote.MatchType_REGEX_NO_MATCH: + case prompb.LabelMatcher_NRE: whereAdd = fmt.Sprintf(` match(name, %s) = 0 `, strings.Replace(m.Value, `/`, `\/`, -1)) } mwhereSQL = append(mwhereSQL, whereAdd) @@ -82,7 +87,7 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { } switch m.Type { - case remote.MatchType_EQUAL: + case prompb.LabelMatcher_EQ: var insql bytes.Buffer asql := "arrayExists(x -> x IN (%s), tags) = 1" // value appears to be | sep'd for multiple matches @@ -101,7 +106,7 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { wstr := fmt.Sprintf(asql, insql.String()) mwhereSQL = append(mwhereSQL, wstr) - case remote.MatchType_NOT_EQUAL: + case prompb.LabelMatcher_NEQ: var insql bytes.Buffer asql := "arrayExists(x -> x IN (%s), tags) = 0" // value appears to be | sep'd for multiple matches @@ -120,7 +125,7 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { wstr := fmt.Sprintf(asql, insql.String()) mwhereSQL = append(mwhereSQL, wstr) - case remote.MatchType_REGEX_MATCH: + case prompb.LabelMatcher_RE: asql := `arrayExists(x -> 1 == match(x, '^%s=%s'),tags) = 1` // we can't have ^ in the regexp since keys are stored in arrays of key=value if strings.HasPrefix(m.Value, "^") { @@ -132,7 +137,7 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { mwhereSQL = append(mwhereSQL, fmt.Sprintf(asql, m.Name, val)) } - case remote.MatchType_REGEX_NO_MATCH: + case prompb.LabelMatcher_NRE: asql := `arrayExists(x -> 1 == match(x, '^%s=%s'),tags) = 0` if strings.HasPrefix(m.Value, "^") { val := strings.Replace(m.Value, "^", "", 1) @@ -152,57 +157,67 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { return sql, nil } -func NewP2CReader(conf *config) (*p2cReader, error) { +func NewP2CReader(conf *config, sugar *zap.SugaredLogger) (*p2cReader, error) { var err error r := new(p2cReader) r.conf = conf + r.logger = sugar r.db, err = sql.Open("clickhouse", r.conf.ChDSN) if err != nil { - fmt.Printf("Error connecting to clickhouse: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("connecting to clickhouse: %s", err.Error()) + return r, err + } + + if err := r.db.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + r.logger.With(readerContent...).Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) + } else { + r.logger.With(readerContent...).Error(err.Error()) + } return r, err } return r, nil } -func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { +func (r *p2cReader) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { var err error var sqlStr string var rows *sql.Rows - resp := remote.ReadResponse{ - Results: []*remote.QueryResult{ - {Timeseries: make([]*remote.TimeSeries, 0, 0)}, + resp := prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + {Timeseries: make([]*prompb.TimeSeries, 0, 0)}, }, } // need to map tags to timeseries to record samples - var tsres = make(map[string]*remote.TimeSeries) + var tsres = make(map[string]*prompb.TimeSeries) - // for debugging/figuring out query format/etc + // for Debugfging/figuring out query format/etc rcount := 0 for _, q := range req.Queries { // remove me.. - fmt.Printf("\nquery: start: %d, end: %d\n\n", q.StartTimestampMs, q.EndTimestampMs) + r.logger.With(readerContent...).Debugf("\nquery: start: %d, end: %d", q.StartTimestampMs, q.EndTimestampMs) // get the select sql sqlStr, err = r.getSQL(q) - fmt.Printf("query: running sql: %s\n\n", sqlStr) + r.logger.With(readerContent...).Debugf("query: running sql: %s", sqlStr) if err != nil { - fmt.Printf("Error: reader: getSQL: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("reader: getSQL: %s", err.Error()) return &resp, err } // get the select sql if err != nil { - fmt.Printf("Error: reader: getSQL: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("reader: getSQL: %s", err.Error()) return &resp, err } // todo: metrics on number of errors, rows, selects, timings, etc rows, err = r.db.Query(sqlStr) if err != nil { - fmt.Printf("Error: query failed: %s", sqlStr) - fmt.Printf("Error: query error: %s\n", err) + r.logger.With(readerContent...).Errorf("query failed: %s", sqlStr) + r.logger.With(readerContent...).Errorf("query error: %s", err) return &resp, err } @@ -218,7 +233,7 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) value float64 ) if err = rows.Scan(&cnt, &t, &name, &tags, &value); err != nil { - fmt.Printf("Error: scan: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("scan: %s", err.Error()) } // remove this.. //fmt.Printf(fmt.Sprintf("%d,%d,%s,%s,%f\n", cnt, t, name, strings.Join(tags, ":"), value)) @@ -227,14 +242,14 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) key := strings.Join(tags, "\xff") ts, ok := tsres[key] if !ok { - ts = &remote.TimeSeries{ + ts = &prompb.TimeSeries{ Labels: makeLabels(tags), } tsres[key] = ts } - ts.Samples = append(ts.Samples, &remote.Sample{ - Value: float64(value), - TimestampMs: t, + ts.Samples = append(ts.Samples, prompb.Sample{ + Value: float64(value), + Timestamp: t, }) } } @@ -244,14 +259,14 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts) } - fmt.Printf("query: returning %d rows for %d queries\n", rcount, len(req.Queries)) + r.logger.With(readerContent...).Debugf("query: returning %d rows for %d queries", rcount, len(req.Queries)) return &resp, nil } -func makeLabels(tags []string) []*remote.LabelPair { - lpairs := make([]*remote.LabelPair, 0, len(tags)) +func makeLabels(tags []string) []prompb.Label { + lpairs := make([]prompb.Label, 0, len(tags)) // (currently) writer includes __name__ in tags so no need to add it here // may change this to save space later.. for _, tag := range tags { @@ -263,7 +278,7 @@ func makeLabels(tags []string) []*remote.LabelPair { if vals[1] == "" { continue } - lpairs = append(lpairs, &remote.LabelPair{ + lpairs = append(lpairs, prompb.Label{ Name: vals[0], Value: vals[1], }) diff --git a/srv.go b/srv.go index 1a9918a..a67124e 100644 --- a/srv.go +++ b/srv.go @@ -1,18 +1,20 @@ package main import ( + "fmt" "io/ioutil" "net/http" "time" - "fmt" - "github.com/golang/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage/remote" - "gopkg.in/tylerb/graceful.v1" + "github.com/prometheus/prometheus/prompb" + + //"github.com/prometheus/prometheus/storage/remote" + "go.uber.org/zap" + graceful "gopkg.in/tylerb/graceful.v1" ) type p2cRequest struct { @@ -29,24 +31,26 @@ type p2cServer struct { writer *p2cWriter reader *p2cReader rx prometheus.Counter + logger *zap.SugaredLogger } -func NewP2CServer(conf *config) (*p2cServer, error) { +func NewP2CServer(conf *config, sugar *zap.SugaredLogger) (*p2cServer, error) { var err error c := new(p2cServer) c.requests = make(chan *p2cRequest, conf.ChanSize) c.mux = http.NewServeMux() c.conf = conf + c.logger = sugar - c.writer, err = NewP2CWriter(conf, c.requests) + c.writer, err = NewP2CWriter(conf, c.requests, sugar) if err != nil { - fmt.Printf("Error creating clickhouse writer: %s\n", err.Error()) + c.logger.Errorf("creating clickhouse writer: %s\n", err.Error()) return c, err } - c.reader, err = NewP2CReader(conf) + c.reader, err = NewP2CReader(conf, sugar) if err != nil { - fmt.Printf("Error creating clickhouse reader: %s\n", err.Error()) + c.logger.Errorf("creating clickhouse reader: %s\n", err.Error()) return c, err } @@ -71,7 +75,7 @@ func NewP2CServer(conf *config) (*p2cServer, error) { return } - var req remote.WriteRequest + var req prompb.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -93,13 +97,13 @@ func NewP2CServer(conf *config) (*p2cServer, error) { return } - var req remote.ReadRequest + var req prompb.ReadRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - var resp *remote.ReadResponse + var resp *prompb.ReadResponse resp, err = c.reader.Read(&req) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -122,14 +126,14 @@ func NewP2CServer(conf *config) (*p2cServer, error) { } }) - c.mux.Handle(c.conf.HTTPMetricsPath, prometheus.InstrumentHandler( - c.conf.HTTPMetricsPath, prometheus.UninstrumentedHandler(), - )) + // c.mux.Handle(c.conf.HTTPMetricsPath, prometheus.InstrumentHandler( + // c.conf.HTTPMetricsPath, prometheus.UninstrumentedHandler(), + // )) return c, nil } -func (c *p2cServer) process(req remote.WriteRequest) { +func (c *p2cServer) process(req prompb.WriteRequest) { for _, series := range req.Timeseries { c.rx.Add(float64(len(series.Samples))) var ( @@ -151,7 +155,7 @@ func (c *p2cServer) process(req remote.WriteRequest) { for _, sample := range series.Samples { p2c := new(p2cRequest) p2c.name = name - p2c.ts = time.Unix(sample.TimestampMs/1000, 0) + p2c.ts = time.Unix(sample.Timestamp/1000, 0) p2c.val = sample.Value p2c.tags = tags c.requests <- p2c @@ -161,7 +165,7 @@ func (c *p2cServer) process(req remote.WriteRequest) { } func (c *p2cServer) Start() error { - fmt.Println("HTTP server starting...") + c.logger.Info("HTTP server starting...") c.writer.Start() return graceful.RunWithErr(c.conf.HTTPAddr, c.conf.HTTPTimeout, c.mux) } @@ -178,10 +182,10 @@ func (c *p2cServer) Shutdown() { select { case <-wchan: - fmt.Println("Writer shutdown cleanly..") + c.logger.Info("Writer shutdown cleanly..") // All done! case <-time.After(10 * time.Second): - fmt.Println("Writer shutdown timed out, samples will be lost..") + c.logger.Info("Writer shutdown timed out, samples will be lost..") } } diff --git a/writer.go b/writer.go index bba4ea8..e6a1550 100644 --- a/writer.go +++ b/writer.go @@ -4,18 +4,20 @@ import ( "database/sql" "fmt" "sort" - "time" - "sync" + "time" - "github.com/kshvakov/clickhouse" + clickhouse "github.com/ClickHouse/clickhouse-go/v2" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" ) var insertSQL = `INSERT INTO %s.%s (date, name, tags, val, ts) VALUES (?, ?, ?, ?, ?)` +var writerContent = []interface{}{"component", "writer"} + type p2cWriter struct { conf *config requests chan *p2cRequest @@ -25,16 +27,28 @@ type p2cWriter struct { ko prometheus.Counter test prometheus.Counter timings prometheus.Histogram + + logger *zap.SugaredLogger } -func NewP2CWriter(conf *config, reqs chan *p2cRequest) (*p2cWriter, error) { +func NewP2CWriter(conf *config, reqs chan *p2cRequest, sugar *zap.SugaredLogger) (*p2cWriter, error) { var err error w := new(p2cWriter) w.conf = conf w.requests = reqs + w.logger = sugar w.db, err = sql.Open("clickhouse", w.conf.ChDSN) if err != nil { - fmt.Printf("Error connecting to clickhouse: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("connecting to clickhouse: %s", err.Error()) + return w, err + } + + if err := w.db.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + w.logger.With(writerContent...).Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) + } else { + w.logger.With(writerContent...).Error(err.Error()) + } return w, err } @@ -78,7 +92,7 @@ func (w *p2cWriter) Start() { go func() { w.wg.Add(1) - fmt.Println("Writer starting..") + w.logger.With(writerContent...).Info("Writer starting..") sql := fmt.Sprintf(insertSQL, w.conf.ChDB, w.conf.ChTable) ok := true for ok { @@ -92,7 +106,7 @@ func (w *p2cWriter) Start() { // get requet and also check if channel is closed req, ok = <-w.requests if !ok { - fmt.Println("Writer stopping..") + w.logger.With(writerContent...).Info("Writer stopping..") break } reqs = append(reqs, req) @@ -107,7 +121,7 @@ func (w *p2cWriter) Start() { // post them to db all at once tx, err := w.db.Begin() if err != nil { - fmt.Printf("Error: begin transaction: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("begin transaction: %s", err.Error()) w.ko.Add(1.0) continue } @@ -116,7 +130,7 @@ func (w *p2cWriter) Start() { smt, err := tx.Prepare(sql) for _, req := range reqs { if err != nil { - fmt.Printf("Error: prepare statement: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("prepare statement: %s", err.Error()) w.ko.Add(1.0) continue } @@ -124,18 +138,18 @@ func (w *p2cWriter) Start() { // ensure tags are inserted in the same order each time // possibly/probably impacts indexing? sort.Strings(req.tags) - _, err = smt.Exec(req.ts, req.name, clickhouse.Array(req.tags), + _, err = smt.Exec(req.ts, req.name, req.tags, req.val, req.ts) if err != nil { - fmt.Printf("Error: statement exec: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("statement exec: %s", err.Error()) w.ko.Add(1.0) } } // commit and record metrics if err = tx.Commit(); err != nil { - fmt.Printf("Error: commit failed: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("commit failed: %s", err.Error()) w.ko.Add(1.0) } else { w.tx.Add(float64(nmetrics)) @@ -143,7 +157,7 @@ func (w *p2cWriter) Start() { } } - fmt.Println("Writer stopped..") + w.logger.With(writerContent...).Info("Writer stopped..") w.wg.Done() }() }