Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"encoding/json"
"errors"
"fmt"
metrics_tracer "github.com/ethereum-optimism/infra/proxyd/metrics/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"io"
"math"
"math/rand"
Expand Down Expand Up @@ -323,6 +328,7 @@ type Backend struct {
intermittentErrorsSlidingWindow *sw.AvgSlidingWindow

weight int
tracer trace.Tracer
}

type BackendOpt func(b *Backend)
Expand Down Expand Up @@ -542,6 +548,7 @@ func NewBackend(
latencySlidingWindow: sw.NewSlidingWindow(),
networkRequestsSlidingWindow: sw.NewSlidingWindow(),
intermittentErrorsSlidingWindow: sw.NewSlidingWindow(),
tracer: otel.Tracer("backend"),
}

backend.Override(opts...)
Expand All @@ -560,6 +567,9 @@ func (b *Backend) Override(opts ...BackendOpt) {
}

func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
ctx, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, ctx, "Backend.Forward")
defer metrics_tracer.CloseSpan(span)

var lastError error
// <= to account for the first attempt not technically being
// a retry
Expand Down Expand Up @@ -668,8 +678,8 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil
}

// ForwardRPC makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
// ForwardRPCForPoller makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPCForPoller(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
jsonParams, err := json.Marshal(params)
if err != nil {
return err
Expand All @@ -681,7 +691,7 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
Params: jsonParams,
ID: []byte(id),
}

ctx = context.WithValue(ctx, metrics_tracer.EnableTraceKey, false)
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false)
if err != nil {
return err
Expand All @@ -699,6 +709,19 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
}

func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
tmpT := time.Now()
ctx, span := metrics_tracer.RecordSingleSpan(b.tracer, ctx,
"Backend.doForward", attribute.Int("batchSize", len(rpcReqs)))
defer metrics_tracer.CloseSpan(span)

methodList := make([]string, len(rpcReqs))
if span != nil && len(rpcReqs) > 0 {
for i, req := range rpcReqs {
methodList[i] = req.Method
}
metrics_tracer.RecordAttributes(span, "method", methodList)
}

// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr()

Expand All @@ -708,6 +731,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if isBatch {
for _, rpcReq := range rpcReqs {
if rpcReq.Method == ConsensusGetReceiptsMethod {
metrics_tracer.RecordError(span, ErrConsensusGetReceiptsCantBeBatched)
return nil, ErrConsensusGetReceiptsCantBeBatched
}
}
Expand All @@ -719,6 +743,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
var reqParams []rpc.BlockNumberOrHash
err := json.Unmarshal(rpcReq.Params, &reqParams)
if err != nil {
metrics_tracer.RecordError(span, err)
return nil, ErrInvalidRequest("invalid request")
}

Expand Down Expand Up @@ -770,8 +795,13 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if err != nil {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
metrics_tracer.RecordError(span, err)
return nil, wrapErr(err, "error creating backend request")
}
if span != nil {
// inject the trace context
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
}

if b.authPassword != "" {
httpReq.SetBasicAuth(b.authUsername, b.authPassword)
Expand All @@ -798,12 +828,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool

start := time.Now()
httpRes, err := b.client.DoLimited(httpReq)
metrics_tracer.SetSpanAttribute(span, attribute.Int64("prepareHttpReqT", time.Since(tmpT).Milliseconds()))
if err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, ErrTooManyRequests)) {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
}
if errors.Is(err, ErrContextCanceled) {
metrics_tracer.RecordError(span, err)
return nil, err
}
return nil, wrapErr(err, "error in backend request")
Expand All @@ -825,24 +857,28 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
metrics_tracer.RecordError(span, fmt.Errorf("response code %d", httpRes.StatusCode))
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
}

defer httpRes.Body.Close()
resB, err := io.ReadAll(LimitReader(httpRes.Body, b.maxResponseSize))
if errors.Is(err, ErrLimitReaderOverLimit) {
metrics_tracer.RecordError(span, ErrBackendResponseTooLarge)
return nil, ErrBackendResponseTooLarge
}
if err != nil {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
metrics_tracer.RecordError(span, err)
return nil, wrapErr(err, "error reading response body")
}

var rpcRes []*RPCRes
if isSingleElementBatch {
var singleRes RPCRes
if err := json.Unmarshal(resB, &singleRes); err != nil {
metrics_tracer.RecordError(span, ErrBackendBadResponse)
return nil, ErrBackendBadResponse
}
rpcRes = []*RPCRes{
Expand All @@ -854,17 +890,20 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if responseIsNotBatched(resB) {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
metrics_tracer.RecordError(span, ErrBackendUnexpectedJSONRPC)
return nil, ErrBackendUnexpectedJSONRPC
}
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
metrics_tracer.RecordError(span, err)
return nil, ErrBackendBadResponse
}
}

if len(rpcReqs) != len(rpcRes) {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
metrics_tracer.RecordError(span, ErrBackendUnexpectedJSONRPC)
return nil, ErrBackendUnexpectedJSONRPC
}

Expand All @@ -890,8 +929,9 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
}
}
}

tmpT = time.Now()
sortBatchRPCResponse(rpcReqs, rpcRes)
metrics_tracer.SetSpanAttribute(span, attribute.Int64("sortBatchRPCResponseT", time.Since(tmpT).Milliseconds()))

return rpcRes, nil
}
Expand Down Expand Up @@ -956,6 +996,7 @@ type BackendGroup struct {
FallbackBackends map[string]bool
routingStrategy RoutingStrategy
multicallRPCErrorCheck bool
tracer trace.Tracer
}

func (bg *BackendGroup) GetRoutingStrategy() RoutingStrategy {
Expand Down Expand Up @@ -985,6 +1026,9 @@ func (bg *BackendGroup) Primaries() []*Backend {

// NOTE: BackendGroup Forward contains the log for balancing with consensus aware
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
ctx, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, ctx, "BackendGroup.Forward")
defer metrics_tracer.CloseSpan(span)

if len(rpcReqs) == 0 {
return nil, "", nil
}
Expand All @@ -995,13 +1039,20 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
// When routing_strategy is set to `consensus_aware` the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
// We also rewrite block tags to enforce compliance with consensus
tmpT := time.Now()
if bg.Consensus != nil {
rpcReqs, overriddenResponses = bg.OverwriteConsensusResponses(rpcReqs, overriddenResponses, rewrittenReqs)
}
OverwriteConsensusResponsesT := time.Since(tmpT)

tmpT = time.Now()
// Choose backends to forward the request to, after rewriting the requests
backends := bg.orderedBackendsForRequest()
orderedBackendsForRequestT := time.Since(tmpT)

metrics_tracer.SetSpanAttribute(span,
attribute.Int64("OverwriteConsensusResponsesT", OverwriteConsensusResponsesT.Milliseconds()),
attribute.Int64("orderedBackendsForRequestT", orderedBackendsForRequestT.Milliseconds()))
rpcRequestsTotal.Inc()

// When routing_strategy is set to 'multicall' the request will be forward to all backends
Expand Down Expand Up @@ -1033,7 +1084,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
)
tmpT = time.Now()
res := OverrideResponses(backendResp.RPCRes, overriddenResponses)
OverrideResponsesT := time.Since(tmpT)
metrics_tracer.SetSpanAttribute(span,
attribute.Int64("OverrideResponsesT", OverrideResponsesT.Milliseconds()))
return res, backendResp.ServedBy, backendResp.error
}

Expand Down Expand Up @@ -1540,10 +1595,14 @@ type LimitedHTTPClient struct {
}

func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error) {
_, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, req.Context(),
"LimitedHTTPClient.DoLimited")
defer metrics_tracer.CloseSpan(span)

if c.sem == nil {
return c.Do(req)
}

startT := time.Now()
if err := c.sem.Acquire(req.Context(), 1); err != nil {
if errors.Is(err, context.Canceled) {
return nil, ErrContextCanceled
Expand All @@ -1552,6 +1611,7 @@ func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error)
return nil, wrapErr(err, ErrTooManyRequests.Message)
}
defer c.sem.Release(1)
metrics_tracer.SetSpanAttribute(span, attribute.Int64("acquireLimitT", time.Since(startT).Milliseconds()))
return c.Do(req)
}

Expand Down Expand Up @@ -1618,6 +1678,9 @@ func (bg *BackendGroup) ForwardRequestToBackendGroup(
ctx context.Context,
isBatch bool,
) *BackendGroupRPCResponse {
ctx, span := metrics_tracer.RecordSingleSpan(metrics_tracer.GlobalTracer, ctx, "ForwardRequestToBackendGroup")
defer metrics_tracer.CloseSpan(span)

for _, back := range backends {
res := make([]*RPCRes, 0)
var err error
Expand Down
13 changes: 13 additions & 0 deletions proxyd/cmd/proxyd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/ethereum-optimism/infra/proxyd/metrics/trace"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -44,9 +45,13 @@ func main() {
}

config := new(proxyd.Config)
// create the default openTelemetry trace config
proxydVersion := GitCommit + "-" + GitDate
config.OpenTelemetryTrace = trace.NewTraceConfig(proxydVersion)
if _, err := toml.DecodeFile(os.Args[1], config); err != nil {
log.Crit("error reading config file", "err", err)
}
trace.GlobalTraceConfig = &config.OpenTelemetryTrace

// update log level from config
logLevel, err := LevelFromString(config.Server.LogLevel)
Expand Down Expand Up @@ -128,6 +133,14 @@ func main() {
proxyd.SetOTelClient(metricsClient)
}

// init the trace
traceProvider, err := trace.InitTracer(&config.OpenTelemetryTrace)
if err != nil {
log.Error("Failed to initialize OpenTelemetry tracer", "err", err)
return
}
defer trace.Shutdown(traceProvider)

// non-blocking
_, shutdown, err := proxyd.Start(config)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxyd

import (
"fmt"
"github.com/ethereum-optimism/infra/proxyd/metrics/trace"
"math/big"
"os"
"strings"
Expand Down Expand Up @@ -252,6 +253,7 @@ type Config struct {
WhitelistErrorMessage string `toml:"whitelist_error_message"`
SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"`
InteropValidationConfig InteropValidationConfig `toml:"interop_validation"`
OpenTelemetryTrace trace.TraceConfig `toml:"opentelemtry_trace"` // the openTelemtry config
}

type InteropValidationConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions proxyd/consensus_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (cp *ConsensusPoller) Reset() {
// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
err = be.ForwardRPCForPoller(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
return 0, "", err
}
Expand All @@ -722,7 +722,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
err = be.ForwardRPCForPoller(ctx, &rpcRes, "67", "net_peerCount")
if err != nil {
return 0, err
}
Expand All @@ -740,7 +740,7 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count
// isInSync is a convenient wrapper to check if the backend is in sync from the network
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
err = be.ForwardRPCForPoller(ctx, &rpcRes, "67", "eth_syncing")
if err != nil {
return false, err
}
Expand Down
13 changes: 13 additions & 0 deletions proxyd/example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,16 @@ consensus_aware = true
eth_call = "main"
eth_chainId = "main"
eth_blockNumber = "main"


[opentelemtry_trace]
# the trace is enabled or not
enabled = false
# the service name
service_name = "xlayer-proxyd"
# the environment
environment = "test"
# the trace report endpoint
otel_endpoint = ""
# the sample rate
sample_rate = 0.6
2 changes: 2 additions & 0 deletions proxyd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ require (
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions proxyd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0 h1:Oe2z/BCg5q7k4iXC3cqJxKYg0ieRiOqF0cecFYdPTwk=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0/go.mod h1:ZQM5lAJpOsKnYagGg/zV2krVqTtaVdYdDkhMoX6Oalg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 h1:GqRJVj7UmLjCVyVJ3ZFLdPRmhDUp2zFmQe3RHIOsw24=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0/go.mod h1:ri3aaHSmCTVYu2AWv44YMauwAQc0aqI9gHKIcSbI1pU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 h1:aTL7F04bJHUlztTsNGJ2l+6he8c+y/b//eR0jjjemT4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0/go.mod h1:kldtb7jDTeol0l3ewcmd8SDvx3EmIE7lyvqbasU3QC4=
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
Expand Down
Loading