diff --git a/proxyd/backend.go b/proxyd/backend.go index 596df6bd..92216c19 100644 --- a/proxyd/backend.go +++ b/proxyd/backend.go @@ -7,6 +7,11 @@ import ( "encoding/json" "errors" "fmt" + metrics_tracer "github.com/ethereum-optimism/infra/proxyd/metrics/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" "io" "math" "math/rand" @@ -323,6 +328,7 @@ type Backend struct { intermittentErrorsSlidingWindow *sw.AvgSlidingWindow weight int + tracer trace.Tracer } type BackendOpt func(b *Backend) @@ -542,6 +548,7 @@ func NewBackend( latencySlidingWindow: sw.NewSlidingWindow(), networkRequestsSlidingWindow: sw.NewSlidingWindow(), intermittentErrorsSlidingWindow: sw.NewSlidingWindow(), + tracer: otel.Tracer("backend"), } backend.Override(opts...) @@ -560,6 +567,9 @@ func (b *Backend) Override(opts ...BackendOpt) { } func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { + ctx, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, ctx, "Backend.Forward") + defer metrics_tracer.CloseSpan(span) + var lastError error // <= to account for the first attempt not technically being // a retry @@ -668,8 +678,8 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil } -// ForwardRPC makes a call directly to a backend and populate the response into `res` -func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error { +// ForwardRPCForPoller makes a call directly to a backend and populate the response into `res` +func (b *Backend) ForwardRPCForPoller(ctx context.Context, res *RPCRes, id string, method string, params ...any) error { jsonParams, err := json.Marshal(params) if err != nil { return err @@ -681,7 +691,7 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method Params: jsonParams, ID: []byte(id), } - + ctx = context.WithValue(ctx, metrics_tracer.EnableTraceKey, false) slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false) if err != nil { return err @@ -699,6 +709,19 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method } func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { + tmpT := time.Now() + ctx, span := metrics_tracer.RecordSingleSpan(b.tracer, ctx, + "Backend.doForward", attribute.Int("batchSize", len(rpcReqs))) + defer metrics_tracer.CloseSpan(span) + + methodList := make([]string, len(rpcReqs)) + if span != nil && len(rpcReqs) > 0 { + for i, req := range rpcReqs { + methodList[i] = req.Method + } + metrics_tracer.RecordAttributes(span, "method", methodList) + } + // we are concerned about network error rates, so we record 1 request independently of how many are in the batch b.networkRequestsSlidingWindow.Incr() @@ -708,6 +731,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if isBatch { for _, rpcReq := range rpcReqs { if rpcReq.Method == ConsensusGetReceiptsMethod { + metrics_tracer.RecordError(span, ErrConsensusGetReceiptsCantBeBatched) return nil, ErrConsensusGetReceiptsCantBeBatched } } @@ -719,6 +743,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool var reqParams []rpc.BlockNumberOrHash err := json.Unmarshal(rpcReq.Params, &reqParams) if err != nil { + metrics_tracer.RecordError(span, err) return nil, ErrInvalidRequest("invalid request") } @@ -770,8 +795,13 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if err != nil { b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + metrics_tracer.RecordError(span, err) return nil, wrapErr(err, "error creating backend request") } + if span != nil { + // inject the trace context + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(httpReq.Header)) + } if b.authPassword != "" { httpReq.SetBasicAuth(b.authUsername, b.authPassword) @@ -798,12 +828,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool start := time.Now() httpRes, err := b.client.DoLimited(httpReq) + metrics_tracer.SetSpanAttribute(span, attribute.Int64("prepareHttpReqT", time.Since(tmpT).Milliseconds())) if err != nil { if !(errors.Is(err, context.Canceled) || errors.Is(err, ErrTooManyRequests)) { b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) } if errors.Is(err, ErrContextCanceled) { + metrics_tracer.RecordError(span, err) return nil, err } return nil, wrapErr(err, "error in backend request") @@ -825,17 +857,20 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + metrics_tracer.RecordError(span, fmt.Errorf("response code %d", httpRes.StatusCode)) return nil, fmt.Errorf("response code %d", httpRes.StatusCode) } defer httpRes.Body.Close() resB, err := io.ReadAll(LimitReader(httpRes.Body, b.maxResponseSize)) if errors.Is(err, ErrLimitReaderOverLimit) { + metrics_tracer.RecordError(span, ErrBackendResponseTooLarge) return nil, ErrBackendResponseTooLarge } if err != nil { b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + metrics_tracer.RecordError(span, err) return nil, wrapErr(err, "error reading response body") } @@ -843,6 +878,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if isSingleElementBatch { var singleRes RPCRes if err := json.Unmarshal(resB, &singleRes); err != nil { + metrics_tracer.RecordError(span, ErrBackendBadResponse) return nil, ErrBackendBadResponse } rpcRes = []*RPCRes{ @@ -854,10 +890,12 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if responseIsNotBatched(resB) { b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + metrics_tracer.RecordError(span, ErrBackendUnexpectedJSONRPC) return nil, ErrBackendUnexpectedJSONRPC } b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + metrics_tracer.RecordError(span, err) return nil, ErrBackendBadResponse } } @@ -865,6 +903,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if len(rpcReqs) != len(rpcRes) { b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + metrics_tracer.RecordError(span, ErrBackendUnexpectedJSONRPC) return nil, ErrBackendUnexpectedJSONRPC } @@ -890,8 +929,9 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool } } } - + tmpT = time.Now() sortBatchRPCResponse(rpcReqs, rpcRes) + metrics_tracer.SetSpanAttribute(span, attribute.Int64("sortBatchRPCResponseT", time.Since(tmpT).Milliseconds())) return rpcRes, nil } @@ -956,6 +996,7 @@ type BackendGroup struct { FallbackBackends map[string]bool routingStrategy RoutingStrategy multicallRPCErrorCheck bool + tracer trace.Tracer } func (bg *BackendGroup) GetRoutingStrategy() RoutingStrategy { @@ -985,6 +1026,9 @@ func (bg *BackendGroup) Primaries() []*Backend { // NOTE: BackendGroup Forward contains the log for balancing with consensus aware func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { + ctx, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, ctx, "BackendGroup.Forward") + defer metrics_tracer.CloseSpan(span) + if len(rpcReqs) == 0 { return nil, "", nil } @@ -995,13 +1039,20 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch // When routing_strategy is set to `consensus_aware` the backend group acts as a load balancer // serving traffic from any backend that agrees in the consensus group // We also rewrite block tags to enforce compliance with consensus + tmpT := time.Now() if bg.Consensus != nil { rpcReqs, overriddenResponses = bg.OverwriteConsensusResponses(rpcReqs, overriddenResponses, rewrittenReqs) } + OverwriteConsensusResponsesT := time.Since(tmpT) + tmpT = time.Now() // Choose backends to forward the request to, after rewriting the requests backends := bg.orderedBackendsForRequest() + orderedBackendsForRequestT := time.Since(tmpT) + metrics_tracer.SetSpanAttribute(span, + attribute.Int64("OverwriteConsensusResponsesT", OverwriteConsensusResponsesT.Milliseconds()), + attribute.Int64("orderedBackendsForRequestT", orderedBackendsForRequestT.Milliseconds())) rpcRequestsTotal.Inc() // When routing_strategy is set to 'multicall' the request will be forward to all backends @@ -1033,7 +1084,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch "req_id", GetReqID(ctx), "auth", GetAuthCtx(ctx), ) + tmpT = time.Now() res := OverrideResponses(backendResp.RPCRes, overriddenResponses) + OverrideResponsesT := time.Since(tmpT) + metrics_tracer.SetSpanAttribute(span, + attribute.Int64("OverrideResponsesT", OverrideResponsesT.Milliseconds())) return res, backendResp.ServedBy, backendResp.error } @@ -1540,10 +1595,14 @@ type LimitedHTTPClient struct { } func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error) { + _, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, req.Context(), + "LimitedHTTPClient.DoLimited") + defer metrics_tracer.CloseSpan(span) + if c.sem == nil { return c.Do(req) } - + startT := time.Now() if err := c.sem.Acquire(req.Context(), 1); err != nil { if errors.Is(err, context.Canceled) { return nil, ErrContextCanceled @@ -1552,6 +1611,7 @@ func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error) return nil, wrapErr(err, ErrTooManyRequests.Message) } defer c.sem.Release(1) + metrics_tracer.SetSpanAttribute(span, attribute.Int64("acquireLimitT", time.Since(startT).Milliseconds())) return c.Do(req) } @@ -1618,6 +1678,9 @@ func (bg *BackendGroup) ForwardRequestToBackendGroup( ctx context.Context, isBatch bool, ) *BackendGroupRPCResponse { + ctx, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, ctx, "ForwardRequestToBackendGroup") + defer metrics_tracer.CloseSpan(span) + for _, back := range backends { res := make([]*RPCRes, 0) var err error diff --git a/proxyd/cmd/proxyd/main.go b/proxyd/cmd/proxyd/main.go index f13ffe8f..f75c4573 100644 --- a/proxyd/cmd/proxyd/main.go +++ b/proxyd/cmd/proxyd/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/ethereum-optimism/infra/proxyd/metrics/trace" "log/slog" "net" "net/http" @@ -44,9 +45,13 @@ func main() { } config := new(proxyd.Config) + // create the default openTelemetry trace config + proxydVersion := GitCommit + "-" + GitDate + config.OpenTelemetryTrace = trace.NewTraceConfig(proxydVersion) if _, err := toml.DecodeFile(os.Args[1], config); err != nil { log.Crit("error reading config file", "err", err) } + trace.GlobalTraceConfig = &config.OpenTelemetryTrace // update log level from config logLevel, err := LevelFromString(config.Server.LogLevel) @@ -128,6 +133,14 @@ func main() { proxyd.SetOTelClient(metricsClient) } + // init the trace + traceProvider, err := trace.InitTracer(&config.OpenTelemetryTrace) + if err != nil { + log.Error("Failed to initialize OpenTelemetry tracer", "err", err) + return + } + defer trace.Shutdown(traceProvider) + // non-blocking _, shutdown, err := proxyd.Start(config) if err != nil { diff --git a/proxyd/config.go b/proxyd/config.go index ec1eff1d..d11c09a1 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -2,6 +2,7 @@ package proxyd import ( "fmt" + "github.com/ethereum-optimism/infra/proxyd/metrics/trace" "math/big" "os" "strings" @@ -252,6 +253,7 @@ type Config struct { WhitelistErrorMessage string `toml:"whitelist_error_message"` SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"` InteropValidationConfig InteropValidationConfig `toml:"interop_validation"` + OpenTelemetryTrace trace.TraceConfig `toml:"opentelemtry_trace"` // the openTelemtry config } type InteropValidationConfig struct { diff --git a/proxyd/consensus_poller.go b/proxyd/consensus_poller.go index c6cbdf54..49952654 100644 --- a/proxyd/consensus_poller.go +++ b/proxyd/consensus_poller.go @@ -704,7 +704,7 @@ func (cp *ConsensusPoller) Reset() { // fetchBlock is a convenient wrapper to make a request to get a block directly from the backend func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { var rpcRes RPCRes - err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) + err = be.ForwardRPCForPoller(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) if err != nil { return 0, "", err } @@ -722,7 +722,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st // getPeerCount is a convenient wrapper to retrieve the current peer count from the backend func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { var rpcRes RPCRes - err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount") + err = be.ForwardRPCForPoller(ctx, &rpcRes, "67", "net_peerCount") if err != nil { return 0, err } @@ -740,7 +740,7 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count // isInSync is a convenient wrapper to check if the backend is in sync from the network func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) { var rpcRes RPCRes - err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing") + err = be.ForwardRPCForPoller(ctx, &rpcRes, "67", "eth_syncing") if err != nil { return false, err } diff --git a/proxyd/example.config.toml b/proxyd/example.config.toml index 86db57ec..1c834492 100644 --- a/proxyd/example.config.toml +++ b/proxyd/example.config.toml @@ -149,3 +149,16 @@ consensus_aware = true eth_call = "main" eth_chainId = "main" eth_blockNumber = "main" + + +[opentelemtry_trace] +# the trace is enabled or not +enabled = false +# the service name +service_name = "xlayer-proxyd" +# the environment +environment = "test" +# the trace report endpoint +otel_endpoint = "" +# the sample rate +sample_rate = 0.6 diff --git a/proxyd/go.mod b/proxyd/go.mod index e2488202..999dccbe 100644 --- a/proxyd/go.mod +++ b/proxyd/go.mod @@ -148,6 +148,8 @@ require ( github.com/yuin/gopher-lua v1.1.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/proxyd/go.sum b/proxyd/go.sum index eab0107d..0d20129e 100644 --- a/proxyd/go.sum +++ b/proxyd/go.sum @@ -447,6 +447,10 @@ go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0 h1:Oe2z/BCg5q7k4iXC3cqJxKYg0ieRiOqF0cecFYdPTwk= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0/go.mod h1:ZQM5lAJpOsKnYagGg/zV2krVqTtaVdYdDkhMoX6Oalg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 h1:GqRJVj7UmLjCVyVJ3ZFLdPRmhDUp2zFmQe3RHIOsw24= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0/go.mod h1:ri3aaHSmCTVYu2AWv44YMauwAQc0aqI9gHKIcSbI1pU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 h1:aTL7F04bJHUlztTsNGJ2l+6he8c+y/b//eR0jjjemT4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0/go.mod h1:kldtb7jDTeol0l3ewcmd8SDvx3EmIE7lyvqbasU3QC4= go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= diff --git a/proxyd/metrics/trace/opentelemetry_trace.go b/proxyd/metrics/trace/opentelemetry_trace.go new file mode 100644 index 00000000..c8f1f69e --- /dev/null +++ b/proxyd/metrics/trace/opentelemetry_trace.go @@ -0,0 +1,229 @@ +package trace + +import ( + "context" + "fmt" + "github.com/ethereum/go-ethereum/log" + "github.com/nacos-group/nacos-sdk-go/common/logger" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + "net" + "strconv" + "time" +) + +// the global traceConfig +var GlobalTraceConfig *TraceConfig + +var GlobalTracer trace.Tracer = otel.Tracer("GlobalTracer") + +type TraceConfig struct { + Enabled bool `toml:"enabled"` + ServiceName string `toml:"service_name"` + Environment string `toml:"environment"` + OTELEndpoint string `toml:"otel_endpoint"` + SampleRate float64 `toml:"sample_rate"` + ServiceVersion string `toml:"-"` // ignore, this field is determined in runtime +} + +var DefaultTraceConfig = TraceConfig{ + Enabled: false, + ServiceName: "xlayer", + Environment: "PROD", + SampleRate: 0.1, +} + +func NewTraceConfig(v string) TraceConfig { + ret := DefaultTraceConfig + ret.ServiceVersion = v + return ret +} + +// init the opentelemetry trace +func InitTracer(cfg *TraceConfig) (*sdktrace.TracerProvider, error) { + if !cfg.Enabled { + log.Info("Skip InitTracer for disabled") + return nil, nil + } + logger.Info("Begin InitTracer ", + " url: ", cfg.OTELEndpoint, + " serviceName:", cfg.ServiceName, " serviceVersion: ", + cfg.ServiceVersion, " environment: ", cfg.Environment, + " sampleRate:", cfg.SampleRate) + + ctx := context.Background() + + // create the otlp-http exporter + exporter, err := otlptracehttp.New(ctx, + otlptracehttp.WithEndpointURL(cfg.OTELEndpoint), + otlptracehttp.WithTimeout(5*time.Second), + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: 10 * time.Second, //the initial retry interval + MaxInterval: 20 * time.Second, // the retry interval + MaxElapsedTime: 20 * time.Second, // the time that drop the data + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP exporter: %w", err) + } + + // create the resource + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(cfg.ServiceName), + semconv.ServiceVersion(cfg.ServiceVersion), + semconv.DeploymentEnvironment(cfg.Environment), + semconv.HostIP(getLocalIP()), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // create the TracerProvider + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(createSampler(cfg.SampleRate)), + ) + + // set the global TracerProvider + otel.SetTracerProvider(tp) + + // set the global propagation + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + logger.Info("InitTracer Success ", + "url:", cfg.OTELEndpoint, + " serviceName: ", cfg.ServiceName, " serviceVersion: ", + cfg.ServiceVersion, " environment: ", cfg.Environment, + "sampleRate: ", cfg.SampleRate) + return tp, nil +} + +func createSampler(fraction float64) sdktrace.Sampler { + return sdktrace.ParentBased( + sdktrace.TraceIDRatioBased(fraction), + sdktrace.WithRemoteParentSampled(sdktrace.AlwaysSample()), + sdktrace.WithRemoteParentNotSampled(sdktrace.TraceIDRatioBased(fraction)), + sdktrace.WithLocalParentSampled(sdktrace.AlwaysSample()), + sdktrace.WithLocalParentNotSampled(sdktrace.TraceIDRatioBased(fraction)), + ) +} + +// Shutdown the TracerProvider +func Shutdown(tp *sdktrace.TracerProvider) { + if tp == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := tp.Shutdown(ctx); err != nil { + logger.Warn("Error shutting down tracer provider, ", "error: ", err) + } +} + +func RecordError(span trace.Span, err error) { + if span == nil || err == nil || !span.IsRecording() { + return + } + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error", err.Error())) +} +func RecordErrors(span trace.Span, errs []error) { + if span == nil || len(errs) == 0 || !span.IsRecording() { + return + } + span.RecordError(errs[0]) + for i, e := range errs { + if errs[i] == nil { + continue + } + span.SetAttributes(attribute.String("error_"+strconv.Itoa(i), e.Error())) + } +} + +func RecordAttributes(span trace.Span, attributeKey string, attrs []string) { + if span == nil || len(attrs) == 0 || !span.IsRecording() { + return + } + for i, attr := range attrs { + span.SetAttributes(attribute.String(attributeKey+strconv.Itoa(i), attr)) + } +} + +func SetSpanAttribute(span trace.Span, attrs ...attribute.KeyValue) { + if span == nil || !span.IsRecording() { + return + } + span.SetAttributes(attrs...) +} + +func RecordSingleSpan(tracer trace.Tracer, ctx context.Context, spanName string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + return recordSingleSpanImpl(tracer, ctx, spanName, attrs...) +} + +func CloseSpan(span trace.Span) { + if span == nil { + return + } + span.End() +} + +func recordSingleSpanImpl(tracer trace.Tracer, ctx context.Context, spanName string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + // the trace is not enabled + if tracer == nil || ctx == nil { + return ctx, nil + } + if !GlobalTraceConfig.Enabled { + return ctx, nil + } + enableTrace, ok := ctx.Value(EnableTraceKey).(bool) + // not enable the trace for specified case + if ok && !enableTrace { + return ctx, nil + } + // the trace is enabled + spanCtx, span := tracer.Start(ctx, spanName) + if span.IsRecording() { + span.SetAttributes(attrs...) + remoteAddr, ok := ctx.Value(RemoteAddrKey).(string) + if ok && remoteAddr != "" { + span.SetAttributes(attribute.String(RemoteAddrKey, remoteAddr)) + } + localAddr, ok := ctx.Value(LocalAddrKey).(string) + if ok && localAddr != "" { + span.SetAttributes(attribute.String(LocalAddrKey, localAddr)) + } + } + return spanCtx, span +} + +func getLocalIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + logger.Warn("Failed to get local IP addresses, return empty ip address directly", "error:", err) + return "" + } + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + } + logger.Warn("No ipv4 address found, return empty ip address directly") + return "" +} diff --git a/proxyd/metrics/trace/trace_protocol.go b/proxyd/metrics/trace/trace_protocol.go new file mode 100644 index 00000000..225b34c9 --- /dev/null +++ b/proxyd/metrics/trace/trace_protocol.go @@ -0,0 +1,8 @@ +package trace + +const ( + RemoteAddrKey = "remote" + LocalAddrKey = "local" + + EnableTraceKey = "trace" +) diff --git a/proxyd/server.go b/proxyd/server.go index 5f9f9470..547d0fd3 100644 --- a/proxyd/server.go +++ b/proxyd/server.go @@ -7,6 +7,11 @@ import ( "encoding/json" "errors" "fmt" + metrics_trace "github.com/ethereum-optimism/infra/proxyd/metrics/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" "io" "math" "math/big" @@ -86,6 +91,7 @@ type Server struct { rateLimitHeader string interopValidatingConfig InteropValidationConfig interopStrategy InteropStrategy + tracer trace.Tracer } type limiterFunc func(method string) bool @@ -212,6 +218,7 @@ func NewServer( rateLimitHeader: rateLimitHeader, interopValidatingConfig: interopValidatingConfig, interopStrategy: interopStrategy, + tracer: otel.Tracer("Server"), }, nil } @@ -280,6 +287,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ctx, cancel = context.WithTimeout(ctx, s.timeout) defer cancel() + ctx, span := metrics_trace.RecordSingleSpan(s.tracer, ctx, "HandleRPC") + defer metrics_trace.CloseSpan(span) + origin := r.Header.Get("Origin") userAgent := r.Header.Get("User-Agent") // Use XFF in context since it will automatically be replaced by the remote IP @@ -348,6 +358,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ) } + // the batch case if IsBatch(body) { reqs, err := ParseBatchRPCReq(body) if err != nil { @@ -370,7 +381,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true) + batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(span, ctx, reqs, isLimited, true) if err == context.DeadlineExceeded { writeRPCError(ctx, w, nil, ErrGatewayTimeout) return @@ -392,8 +403,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } + // the single case rawBody := json.RawMessage(body) - backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false) + backendRes, cached, servedBy, err := s.handleBatchRPC(span, ctx, []json.RawMessage{rawBody}, isLimited, false) if err != nil { if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) { @@ -483,7 +495,8 @@ func (s *Server) validateInteropSendRpcRequest(ctx context.Context, tx *types.Tr return finalErr } -func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) { +func (s *Server) handleBatchRPC(span trace.Span, ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) { + metrics_trace.SetSpanAttribute(span, attribute.Int("request.size", len(reqs))) // A request set is transformed into groups of batches. // Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints) // A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's @@ -499,14 +512,18 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL batches := make(map[batchGroup][]batchElem) ids := make(map[string]int, len(reqs)) + methodList := make([]string, len(reqs)) + errs := make([]error, len(reqs)) + for i := range reqs { parsedReq, err := ParseRPCReq(reqs[i]) + errs[i] = nil if err != nil { log.Info("error parsing RPC call", "source", "rpc", "err", err) responses[i] = NewRPCErrorRes(nil, err) + errs[i] = err continue } - // Simple health check if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod { res := &RPCRes{ @@ -517,9 +534,11 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL return []*RPCRes{res}, false, "", nil } + methodList[i] = parsedReq.Method if err := ValidateRPCReq(parsedReq); err != nil { RecordRPCError(ctx, BackendProxyd, MethodUnknown, err) responses[i] = NewRPCErrorRes(nil, err) + errs[i] = err continue } @@ -541,6 +560,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL ) RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted) responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted) + errs[i] = ErrMethodNotWhitelisted continue } @@ -554,6 +574,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL ) RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit) responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit) + errs[i] = ErrOverRateLimit continue } @@ -567,6 +588,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL ) RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit) responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit) + errs[i] = ErrOverRateLimit continue } @@ -578,16 +600,19 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL if err != nil { RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) responses[i] = NewRPCErrorRes(parsedReq.ID, err) + errs[i] = err continue } if err := s.rateLimitSender(ctx, tx); err != nil { RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) responses[i] = NewRPCErrorRes(parsedReq.ID, err) + errs[i] = err continue } if err := s.validateInteropSendRpcRequest(ctx, tx); err != nil { RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) responses[i] = NewRPCErrorRes(parsedReq.ID, err) + errs[i] = err continue } @@ -600,6 +625,8 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL batchGroup := batchGroup{groupID: batchGroupID, backendGroup: group} batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i}) } + metrics_trace.RecordErrors(span, errs) + metrics_trace.RecordAttributes(span, "method", methodList) servedBy := make(map[string]bool, 0) var cached bool @@ -626,6 +653,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL "batch_index", i, ) batchRPCShortCircuitsTotal.Inc() + metrics_trace.RecordError(span, context.DeadlineExceeded) return nil, false, "", context.DeadlineExceeded } @@ -746,6 +774,13 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context ctx = context.WithValue(ctx, ContextKeyAuth, s.authenticatedPaths[authorization]) // nolint:staticcheck } + // for the trace context + if metrics_trace.GlobalTraceConfig.Enabled { + ctx = context.WithValue(ctx, metrics_trace.RemoteAddrKey, r.RemoteAddr) + ctx = context.WithValue(ctx, metrics_trace.LocalAddrKey, r.Host) + ctx = otel.GetTextMapPropagator().Extract(ctx, + propagation.HeaderCarrier(r.Header)) + } return context.WithValue( ctx, ContextKeyReqID, // nolint:staticcheck