diff --git a/client.go b/client.go index fd6cba163..a713a6d0f 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,7 @@ import ( "github.com/getsentry/sentry-go/internal/protocol" "github.com/getsentry/sentry-go/internal/ratelimit" "github.com/getsentry/sentry-go/internal/telemetry" + "github.com/getsentry/sentry-go/report" ) // The identifier of the SDK. @@ -248,6 +249,8 @@ type ClientOptions struct { EnableLogs bool // DisableMetrics controls when metrics should be emitted. DisableMetrics bool + // DisableClientReports controls when client reports should be emitted. + DisableClientReports bool // TraceIgnoreStatusCodes is a list of HTTP status codes that should not be traced. // Each element can be either: // - A single-element slice [code] for a specific status code @@ -286,6 +289,7 @@ type Client struct { batchLogger *logBatchProcessor batchMeter *metricBatchProcessor telemetryProcessor *telemetry.Processor + reporter *report.Aggregator } // NewClient creates and returns an instance of Client configured using @@ -389,6 +393,12 @@ func NewClient(options ClientOptions) (*Client, error) { sdkVersion: SDKVersion, } + if !options.DisableClientReports { + // Use the global registry to get or create a reporter for this DSN. + // This ensures all components using the same DSN share the same reporter. + client.reporter = report.GetOrCreateAggregator(options.Dsn) + } + client.setupTransport() // noop Telemetry Buffers and Processor fow now @@ -462,11 +472,11 @@ func (client *Client) setupTelemetryProcessor() { // nolint: unused client.Transport = &internalAsyncTransportAdapter{transport: transport} buffers := map[ratelimit.Category]telemetry.Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryLog, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second), - ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryTraceMetric: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTraceMetric, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second), + ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.TelemetryItem](client.options.Dsn, ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.TelemetryItem](client.options.Dsn, ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.TelemetryItem](client.options.Dsn, ratelimit.CategoryLog, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second), + ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.TelemetryItem](client.options.Dsn, ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryTraceMetric: telemetry.NewRingBuffer[protocol.TelemetryItem](client.options.Dsn, ratelimit.CategoryTraceMetric, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second), } sdkInfo := &protocol.SdkInfo{ @@ -560,9 +570,12 @@ func (client *Client) captureLog(log *Log, _ *Scope) bool { } if client.options.BeforeSendLog != nil { + approxSize := log.ApproximateSize() log = client.options.BeforeSendLog(log) if log == nil { debuglog.Println("Log dropped due to BeforeSendLog callback.") + client.reporter.RecordOne(report.ReasonBeforeSend, ratelimit.CategoryLog) + client.reporter.Record(report.ReasonBeforeSend, ratelimit.CategoryLogByte, int64(approxSize)) return false } } @@ -570,11 +583,14 @@ func (client *Client) captureLog(log *Log, _ *Scope) bool { if client.telemetryProcessor != nil { if !client.telemetryProcessor.Add(log) { debuglog.Print("Dropping log: telemetry buffer full or category missing") + // Note: processor tracks client report return false } } else if client.batchLogger != nil { if !client.batchLogger.Send(log) { debuglog.Printf("Dropping log [%s]: buffer full", log.Level) + client.reporter.RecordOne(report.ReasonBufferOverflow, ratelimit.CategoryLog) + client.reporter.Record(report.ReasonBufferOverflow, ratelimit.CategoryLogByte, int64(log.ApproximateSize())) return false } } @@ -591,6 +607,7 @@ func (client *Client) captureMetric(metric *Metric, _ *Scope) bool { metric = client.options.BeforeSendMetric(metric) if metric == nil { debuglog.Println("Metric dropped due to BeforeSendMetric callback.") + client.reporter.RecordOne(report.ReasonBeforeSend, ratelimit.CategoryTraceMetric) return false } } @@ -598,11 +615,13 @@ func (client *Client) captureMetric(metric *Metric, _ *Scope) bool { if client.telemetryProcessor != nil { if !client.telemetryProcessor.Add(metric) { debuglog.Printf("Dropping metric: telemetry buffer full or category missing") + // Note: processor tracks client report return false } } else if client.batchMeter != nil { if !client.batchMeter.Send(metric) { debuglog.Printf("Dropping metric %q: buffer full", metric.Name) + client.reporter.RecordOne(report.ReasonBufferOverflow, ratelimit.CategoryTraceMetric) return false } } @@ -720,6 +739,7 @@ func (client *Client) Close() { if client.batchMeter != nil { client.batchMeter.Shutdown() } + report.UnregisterAggregator(client.options.Dsn) client.Transport.Close() } @@ -811,6 +831,7 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod // (errors, messages) are sampled here. Does not apply to check-ins. if event.Type != transactionType && event.Type != checkInType && !sample(client.options.SampleRate) { debuglog.Println("Event dropped due to SampleRate hit.") + client.reporter.RecordOne(report.ReasonSampleRate, ratelimit.CategoryError) return nil } @@ -825,16 +846,25 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod switch event.Type { case transactionType: if client.options.BeforeSendTransaction != nil { - if event = client.options.BeforeSendTransaction(event, hint); event == nil { + spanCountBefore := event.GetSpanCount() + event = client.options.BeforeSendTransaction(event, hint) + if event == nil { debuglog.Println("Transaction dropped due to BeforeSendTransaction callback.") + client.reporter.RecordOne(report.ReasonBeforeSend, ratelimit.CategoryTransaction) + client.reporter.Record(report.ReasonBeforeSend, ratelimit.CategorySpan, int64(spanCountBefore)) return nil } + // Track spans removed by the callback + if droppedSpans := spanCountBefore - event.GetSpanCount(); droppedSpans > 0 { + client.reporter.Record(report.ReasonBeforeSend, ratelimit.CategorySpan, int64(droppedSpans)) + } } case checkInType: // not a default case, since we shouldn't apply BeforeSend on check-in events default: if client.options.BeforeSend != nil { if event = client.options.BeforeSend(event, hint); event == nil { debuglog.Println("Event dropped due to BeforeSend callback.") + client.reporter.RecordOne(report.ReasonBeforeSend, ratelimit.CategoryError) return nil } } @@ -905,20 +935,44 @@ func (client *Client) prepareEvent(event *Event, hint *EventHint, scope EventMod for _, processor := range client.eventProcessors { id := event.EventID + category := event.toCategory() + spanCountBefore := event.GetSpanCount() event = processor(event, hint) if event == nil { debuglog.Printf("Event dropped by one of the Client EventProcessors: %s\n", id) + client.reporter.RecordOne(report.ReasonEventProcessor, category) + if category == ratelimit.CategoryTransaction { + client.reporter.Record(report.ReasonEventProcessor, ratelimit.CategorySpan, int64(spanCountBefore)) + } return nil } + // Track spans removed by the processor + if category == ratelimit.CategoryTransaction { + if droppedSpans := spanCountBefore - event.GetSpanCount(); droppedSpans > 0 { + client.reporter.Record(report.ReasonEventProcessor, ratelimit.CategorySpan, int64(droppedSpans)) + } + } } for _, processor := range globalEventProcessors { id := event.EventID + category := event.toCategory() + spanCountBefore := event.GetSpanCount() event = processor(event, hint) if event == nil { debuglog.Printf("Event dropped by one of the Global EventProcessors: %s\n", id) + client.reporter.RecordOne(report.ReasonEventProcessor, category) + if category == ratelimit.CategoryTransaction { + client.reporter.Record(report.ReasonEventProcessor, ratelimit.CategorySpan, int64(spanCountBefore)) + } return nil } + // Track spans removed by the processor + if category == ratelimit.CategoryTransaction { + if droppedSpans := spanCountBefore - event.GetSpanCount(); droppedSpans > 0 { + client.reporter.Record(report.ReasonEventProcessor, ratelimit.CategorySpan, int64(droppedSpans)) + } + } } return event diff --git a/client_reports_test.go b/client_reports_test.go new file mode 100644 index 000000000..043c081b4 --- /dev/null +++ b/client_reports_test.go @@ -0,0 +1,105 @@ +package sentry + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/internal/testutils" + "github.com/getsentry/sentry-go/report" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +// TestClientReports_Integration tests that client reports are properly generated +// and sent when events are dropped for various reasons. +func TestClientReports_Integration(t *testing.T) { + var receivedBodies [][]byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + receivedBodies = append(receivedBodies, body) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"id":"test-event-id"}`)) + })) + defer srv.Close() + + dsn := strings.Replace(srv.URL, "//", "//test@", 1) + "/1" + hub := CurrentHub().Clone() + c, err := NewClient(ClientOptions{ + Dsn: dsn, + DisableClientReports: false, + SampleRate: 1.0, + BeforeSend: func(event *Event, _ *EventHint) *Event { + if event.Message == "drop-me" { + return nil + } + return event + }, + }) + if err != nil { + t.Fatalf("Init failed: %v", err) + } + hub.BindClient(c) + defer hub.Flush(testutils.FlushTimeout()) + + // second client with disabled reports shouldn't affect the first + _, _ = NewClient(ClientOptions{ + Dsn: testDsn, + DisableClientReports: true, + }) + + // simulate dropped events for report outcomes + hub.CaptureMessage("drop-me") + scope := NewScope() + scope.AddEventProcessor(func(event *Event, _ *EventHint) *Event { + if event.Message == "processor-drop" { + return nil + } + return event + }) + hub.WithScope(func(s *Scope) { + s.eventProcessors = scope.eventProcessors + hub.CaptureMessage("processor-drop") + }) + + hub.CaptureMessage("hi") // send an event to capture the report along with it + if !hub.Flush(testutils.FlushTimeout()) { + t.Fatal("Flush timed out") + } + + var got report.ClientReport + found := false + for _, b := range receivedBodies { + for _, line := range bytes.Split(b, []byte("\n")) { + if json.Unmarshal(line, &got) == nil && len(got.DiscardedEvents) > 0 { + found = true + break + } + } + if found { + break + } + } + if !found { + t.Fatal("no client report found in envelope bodies") + } + + if got.Timestamp.IsZero() { + t.Error("client report missing timestamp") + } + + want := []report.DiscardedEvent{ + {Reason: report.ReasonBeforeSend, Category: ratelimit.CategoryError, Quantity: 1}, + {Reason: report.ReasonEventProcessor, Category: ratelimit.CategoryError, Quantity: 1}, + } + if diff := cmp.Diff(want, got.DiscardedEvents, cmpopts.SortSlices(func(a, b report.DiscardedEvent) bool { + return a.Reason < b.Reason + })); diff != "" { + t.Errorf("DiscardedEvents mismatch (-want +got):\n%s", diff) + } +} diff --git a/interfaces.go b/interfaces.go index 5dd9a6472..453b7e8d1 100644 --- a/interfaces.go +++ b/interfaces.go @@ -513,7 +513,7 @@ func (e *Event) ToEnvelopeItem() (*protocol.EnvelopeItem, error) { var item *protocol.EnvelopeItem switch e.Type { case transactionType: - item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeTransaction, eventBody) + item = protocol.NewTransactionItem(e.GetSpanCount(), eventBody) case checkInType: item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeCheckIn, eventBody) case logEvent.Type: @@ -553,6 +553,15 @@ func (e *Event) GetDynamicSamplingContext() map[string]string { return trace } +// GetSpanCount returns the number of spans in the transaction including the transaction itself. It is used for client +// reports. Returns 0 for non-transaction events. +func (e *Event) GetSpanCount() int { + if e.Type != transactionType { + return 0 + } + return len(e.Spans) + 1 +} + // TODO: Event.Contexts map[string]interface{} => map[string]EventContext, // to prevent accidentally storing T when we mean *T. // For example, the TraceContext must be stored as *TraceContext to pick up the @@ -755,6 +764,28 @@ type Log struct { Severity int `json:"severity_number,omitempty"` Body string `json:"body"` Attributes map[string]Attribute `json:"attributes,omitempty"` + + // approximateSize is the pre-computed approximate size in bytes. + approximateSize int +} + +// ApproximateSize returns the pre-computed approximate serialized size in bytes. +func (l *Log) ApproximateSize() int { + return l.approximateSize +} + +// computeLogSize estimates the serialized JSON size of a log entry. +func computeLogSize(l *Log) int { + // Base overhead: timestamp, trace_id, level, severity, JSON structure + size := len(l.Body) + 60 + for k, v := range l.Attributes { + // Key + type/value JSON overhead + size += len(k) + 20 + if s, ok := v.Value.(string); ok { + size += len(s) + } + } + return size } // GetCategory returns the rate limit category for logs. diff --git a/internal/http/transport.go b/internal/http/transport.go index 51bd68778..6279d14cd 100644 --- a/internal/http/transport.go +++ b/internal/http/transport.go @@ -11,13 +11,13 @@ import ( "net/http" "net/url" "sync" - "sync/atomic" "time" "github.com/getsentry/sentry-go/internal/debuglog" "github.com/getsentry/sentry-go/internal/protocol" "github.com/getsentry/sentry-go/internal/ratelimit" "github.com/getsentry/sentry-go/internal/util" + "github.com/getsentry/sentry-go/report" ) const ( @@ -147,6 +147,7 @@ type SyncTransport struct { dsn *protocol.Dsn client *http.Client transport http.RoundTripper + reporter *report.Aggregator mu sync.Mutex limits ratelimit.Map @@ -161,10 +162,15 @@ func NewSyncTransport(options TransportOptions) protocol.TelemetryTransport { return NewNoopTransport() } + // Fetch reporter from global registry (created by Client). + // Transports should not create reporters, only use existing ones. + reporter := report.GetAggregator(options.Dsn) + transport := &SyncTransport{ - Timeout: defaultTimeout, - limits: make(ratelimit.Map), - dsn: dsn, + Timeout: defaultTimeout, + limits: make(ratelimit.Map), + dsn: dsn, + reporter: reporter, } if options.HTTPTransport != nil { @@ -207,12 +213,16 @@ func (t *SyncTransport) SendEnvelopeWithContext(ctx context.Context, envelope *p category := categoryFromEnvelope(envelope) if t.disabled(category) { + t.reporter.RecordForEnvelope(report.ReasonRateLimitBackoff, envelope) return nil } + // the sync transport needs to attach client reports when available + t.reporter.AttachToEnvelope(envelope) request, err := getSentryRequestFromEnvelope(ctx, t.dsn, envelope) if err != nil { debuglog.Printf("There was an issue creating the request: %v", err) + t.reporter.RecordForEnvelope(report.ReasonInternalError, envelope) return err } identifier := util.EnvelopeIdentifier(envelope) @@ -226,6 +236,7 @@ func (t *SyncTransport) SendEnvelopeWithContext(ctx context.Context, envelope *p response, err := t.client.Do(request) if err != nil { debuglog.Printf("There was an issue with sending an event: %v", err) + t.reporter.RecordForEnvelope(report.ReasonNetworkError, envelope) return err } util.HandleHTTPResponse(response, identifier) @@ -268,6 +279,7 @@ type AsyncTransport struct { dsn *protocol.Dsn client *http.Client transport http.RoundTripper + reporter *report.Aggregator queue chan *protocol.Envelope @@ -279,10 +291,6 @@ type AsyncTransport struct { flushRequest chan chan struct{} - sentCount int64 - droppedCount int64 - errorCount int64 - QueueSize int Timeout time.Duration @@ -297,12 +305,17 @@ func NewAsyncTransport(options TransportOptions) protocol.TelemetryTransport { return NewNoopTransport() } + // Fetch reporter from global registry (created by Client). + // Transports should not create reporters, only use existing ones. + reporter := report.GetAggregator(options.Dsn) + transport := &AsyncTransport{ QueueSize: defaultQueueSize, Timeout: defaultTimeout, done: make(chan struct{}), limits: make(ratelimit.Map), dsn: dsn, + reporter: reporter, } transport.queue = make(chan *protocol.Envelope, transport.QueueSize) @@ -363,6 +376,7 @@ func (t *AsyncTransport) SendEnvelope(envelope *protocol.Envelope) error { category := categoryFromEnvelope(envelope) if t.isRateLimited(category) { + t.reporter.RecordForEnvelope(report.ReasonRateLimitBackoff, envelope) return nil } @@ -377,7 +391,7 @@ func (t *AsyncTransport) SendEnvelope(envelope *protocol.Envelope) error { ) return nil default: - atomic.AddInt64(&t.droppedCount, 1) + t.reporter.RecordForEnvelope(report.ReasonQueueOverflow, envelope) return ErrTransportQueueFull } } @@ -430,7 +444,7 @@ func (t *AsyncTransport) worker() { if !open { return } - t.processEnvelope(envelope) + t.sendEnvelopeHTTP(envelope) case flushResponse, open := <-t.flushRequest: if !open { return @@ -448,26 +462,21 @@ func (t *AsyncTransport) drainQueue() { if !open { return } - t.processEnvelope(envelope) + t.sendEnvelopeHTTP(envelope) default: return } } } -func (t *AsyncTransport) processEnvelope(envelope *protocol.Envelope) { - if t.sendEnvelopeHTTP(envelope) { - atomic.AddInt64(&t.sentCount, 1) - } else { - atomic.AddInt64(&t.errorCount, 1) - } -} - -func (t *AsyncTransport) sendEnvelopeHTTP(envelope *protocol.Envelope) bool { +func (t *AsyncTransport) sendEnvelopeHTTP(envelope *protocol.Envelope) bool { //nolint: unparam category := categoryFromEnvelope(envelope) if t.isRateLimited(category) { + t.reporter.RecordForEnvelope(report.ReasonRateLimitBackoff, envelope) return false } + // attach to envelope after rate-limit check + t.reporter.AttachToEnvelope(envelope) ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() @@ -475,18 +484,23 @@ func (t *AsyncTransport) sendEnvelopeHTTP(envelope *protocol.Envelope) bool { request, err := getSentryRequestFromEnvelope(ctx, t.dsn, envelope) if err != nil { debuglog.Printf("Failed to create request from envelope: %v", err) + t.reporter.RecordForEnvelope(report.ReasonInternalError, envelope) return false } response, err := t.client.Do(request) if err != nil { debuglog.Printf("HTTP request failed: %v", err) + t.reporter.RecordForEnvelope(report.ReasonNetworkError, envelope) return false } defer response.Body.Close() identifier := util.EnvelopeIdentifier(envelope) success := util.HandleHTTPResponse(response, identifier) + if !success && response.StatusCode != http.StatusTooManyRequests { + t.reporter.RecordForEnvelope(report.ReasonSendError, envelope) + } t.mu.Lock() if t.limits == nil { diff --git a/internal/http/transport_test.go b/internal/http/transport_test.go index 6cd400275..febdab219 100644 --- a/internal/http/transport_test.go +++ b/internal/http/transport_test.go @@ -83,7 +83,9 @@ func TestAsyncTransport_SendEnvelope(t *testing.T) { {"attachment", protocol.EnvelopeItemTypeAttachment}, } + var count int64 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt64(&count, 1) w.WriteHeader(http.StatusOK) })) defer server.Close() @@ -108,14 +110,17 @@ func TestAsyncTransport_SendEnvelope(t *testing.T) { } expectedCount := int64(len(tests)) - if sent := atomic.LoadInt64(&transport.sentCount); sent != expectedCount { + if sent := atomic.LoadInt64(&count); sent != expectedCount { t.Errorf("expected %d sent, got %d", expectedCount, sent) } }) t.Run("server error", func(t *testing.T) { + var requestCount int64 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusInternalServerError) + atomic.AddInt64(&requestCount, 1) + status := http.StatusInternalServerError + w.WriteHeader(status) })) defer server.Close() @@ -136,11 +141,8 @@ func TestAsyncTransport_SendEnvelope(t *testing.T) { t.Fatal("Flush timed out") } - if sent := atomic.LoadInt64(&transport.sentCount); sent != 0 { - t.Errorf("expected 0 sent, got %d", sent) - } - if errors := atomic.LoadInt64(&transport.errorCount); errors != 1 { - t.Errorf("expected 1 error, got %d", errors) + if sent := atomic.LoadInt64(&requestCount); sent != 1 { + t.Errorf("expected 1 request, got %d", sent) } }) diff --git a/internal/protocol/envelope.go b/internal/protocol/envelope.go index 0ec278426..b3f4fb093 100644 --- a/internal/protocol/envelope.go +++ b/internal/protocol/envelope.go @@ -41,12 +41,13 @@ type EnvelopeItemType string // Constants for envelope item types as defined in the Sentry documentation. const ( - EnvelopeItemTypeEvent EnvelopeItemType = "event" - EnvelopeItemTypeTransaction EnvelopeItemType = "transaction" - EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in" - EnvelopeItemTypeAttachment EnvelopeItemType = "attachment" - EnvelopeItemTypeLog EnvelopeItemType = "log" - EnvelopeItemTypeTraceMetric EnvelopeItemType = "trace_metric" + EnvelopeItemTypeEvent EnvelopeItemType = "event" + EnvelopeItemTypeTransaction EnvelopeItemType = "transaction" + EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in" + EnvelopeItemTypeAttachment EnvelopeItemType = "attachment" + EnvelopeItemTypeLog EnvelopeItemType = "log" + EnvelopeItemTypeTraceMetric EnvelopeItemType = "trace_metric" + EnvelopeItemTypeClientReport EnvelopeItemType = "client_report" ) // EnvelopeItemHeader represents the header of an envelope item. @@ -68,6 +69,9 @@ type EnvelopeItemHeader struct { // ItemCount is the number of items in a batch (used for logs) ItemCount *int `json:"item_count,omitempty"` + + // SpanCount is the number of spans in a transaction (used for client reports) + SpanCount int `json:"-"` } // EnvelopeItem represents a single item or batch within an envelope. @@ -187,6 +191,19 @@ func NewEnvelopeItem(itemType EnvelopeItemType, payload []byte) *EnvelopeItem { } } +// NewTransactionItem creates a new envelope item including the span count of the transaction. +func NewTransactionItem(spanCount int, payload []byte) *EnvelopeItem { + length := len(payload) + return &EnvelopeItem{ + Header: &EnvelopeItemHeader{ + Type: EnvelopeItemTypeTransaction, + Length: &length, + SpanCount: spanCount, + }, + Payload: payload, + } +} + // NewAttachmentItem creates a new envelope item for an attachment. // Parameters: filename, contentType, payload. func NewAttachmentItem(filename, contentType string, payload []byte) *EnvelopeItem { @@ -229,3 +246,15 @@ func NewTraceMetricItem(itemCount int, payload []byte) *EnvelopeItem { Payload: payload, } } + +// NewClientReportItem creates a new envelope item for client reports. +func NewClientReportItem(payload []byte) *EnvelopeItem { + length := len(payload) + return &EnvelopeItem{ + Header: &EnvelopeItemHeader{ + Type: EnvelopeItemTypeClientReport, + Length: &length, + }, + Payload: payload, + } +} diff --git a/internal/ratelimit/category.go b/internal/ratelimit/category.go index aec8bb8d0..60dbfeccc 100644 --- a/internal/ratelimit/category.go +++ b/internal/ratelimit/category.go @@ -20,7 +20,9 @@ const ( CategoryAll Category = "" // Special category for empty categories (applies to all) CategoryError Category = "error" CategoryTransaction Category = "transaction" + CategorySpan Category = "span" CategoryLog Category = "log_item" + CategoryLogByte Category = "log_byte" CategoryMonitor Category = "monitor" CategoryTraceMetric Category = "trace_metric" ) @@ -45,8 +47,12 @@ func (c Category) String() string { return "CategoryError" case CategoryTransaction: return "CategoryTransaction" + case CategorySpan: + return "CategorySpan" case CategoryLog: return "CategoryLog" + case CategoryLogByte: + return "CategoryLogByte" case CategoryMonitor: return "CategoryMonitor" case CategoryTraceMetric: diff --git a/internal/telemetry/bucketed_buffer.go b/internal/telemetry/bucketed_buffer.go index 75e621e55..064471134 100644 --- a/internal/telemetry/bucketed_buffer.go +++ b/internal/telemetry/bucketed_buffer.go @@ -5,7 +5,9 @@ import ( "sync/atomic" "time" + "github.com/getsentry/sentry-go/internal/protocol" "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/report" ) const ( @@ -39,6 +41,7 @@ type BucketedBuffer[T any] struct { category ratelimit.Category priority ratelimit.Priority overflowPolicy OverflowPolicy + reporter *report.Aggregator batchSize int timeout time.Duration lastFlushTime time.Time @@ -49,6 +52,7 @@ type BucketedBuffer[T any] struct { } func NewBucketedBuffer[T any]( + dsn string, category ratelimit.Category, capacity int, overflowPolicy OverflowPolicy, @@ -78,6 +82,7 @@ func NewBucketedBuffer[T any]( category: category, priority: category.GetPriority(), overflowPolicy: overflowPolicy, + reporter: report.GetAggregator(dsn), batchSize: batchSize, timeout: timeout, lastFlushTime: time.Now(), @@ -142,6 +147,7 @@ func (b *BucketedBuffer[T]) handleOverflow(item T, traceID string) bool { case OverflowPolicyDropOldest: oldestBucket := b.buckets[b.head] if oldestBucket == nil { + b.recordDroppedItem(item) atomic.AddInt64(&b.dropped, 1) if b.onDropped != nil { b.onDropped(item, "buffer_full_invalid_state") @@ -155,6 +161,7 @@ func (b *BucketedBuffer[T]) handleOverflow(item T, traceID string) bool { atomic.AddInt64(&b.dropped, int64(droppedCount)) if b.onDropped != nil { for _, di := range oldestBucket.items { + b.recordDroppedItem(di) b.onDropped(di, "buffer_full_drop_oldest_bucket") } } @@ -173,12 +180,14 @@ func (b *BucketedBuffer[T]) handleOverflow(item T, traceID string) bool { return true case OverflowPolicyDropNewest: atomic.AddInt64(&b.dropped, 1) + b.recordDroppedItem(item) if b.onDropped != nil { b.onDropped(item, "buffer_full_drop_newest") } return false default: atomic.AddInt64(&b.dropped, 1) + b.recordDroppedItem(item) if b.onDropped != nil { b.onDropped(item, "unknown_overflow_policy") } @@ -396,3 +405,11 @@ func (b *BucketedBuffer[T]) MarkFlushed() { defer b.mu.Unlock() b.lastFlushTime = time.Now() } + +func (b *BucketedBuffer[T]) recordDroppedItem(item T) { + if ti, ok := any(item).(protocol.TelemetryItem); ok { + b.reporter.RecordItem(report.ReasonBufferOverflow, ti) + } else { + b.reporter.RecordOne(report.ReasonBufferOverflow, b.category) + } +} diff --git a/internal/telemetry/bucketed_buffer_test.go b/internal/telemetry/bucketed_buffer_test.go index 4e3cc004f..0724b055b 100644 --- a/internal/telemetry/bucketed_buffer_test.go +++ b/internal/telemetry/bucketed_buffer_test.go @@ -22,7 +22,7 @@ func (i tbItem) GetTraceID() (string, bool) { } func TestBucketedBufferPollOperation(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 3, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 3, 0) if !b.Offer(tbItem{id: 1}) || !b.Offer(tbItem{id: 2}) { t.Fatal("offer failed") } @@ -41,7 +41,7 @@ func TestBucketedBufferPollOperation(t *testing.T) { } func TestBucketedBufferOverflowDropOldest(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) dropped := 0 b.SetDroppedCallback(func(_ tbItem, _ string) { dropped++ }) b.Offer(tbItem{id: 1, trace: "a"}) @@ -59,7 +59,7 @@ func TestBucketedBufferOverflowDropOldest(t *testing.T) { } func TestBucketedBufferPollIfReady_BatchSize(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 3, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 3, 0) for i := 1; i <= 3; i++ { b.Offer(tbItem{id: i, trace: "t"}) } @@ -73,7 +73,7 @@ func TestBucketedBufferPollIfReady_BatchSize(t *testing.T) { } func TestBucketedBufferPollIfReady_Timeout(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 100, 1*time.Millisecond) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 100, 1*time.Millisecond) b.Offer(tbItem{id: 1, trace: "t"}) time.Sleep(3 * time.Millisecond) items := b.PollIfReady() @@ -83,7 +83,7 @@ func TestBucketedBufferPollIfReady_Timeout(t *testing.T) { } func TestNewBucketedBuffer(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 0, -1) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 0, -1) if b.Capacity() != 100 { t.Fatalf("default capacity want 100 got %d", b.Capacity()) } @@ -96,7 +96,7 @@ func TestNewBucketedBuffer(t *testing.T) { } func TestBucketedBufferBasicOperations(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) if !b.IsEmpty() || b.IsFull() || b.Size() != 0 { t.Fatalf("unexpected initial state: empty=%v full=%v size=%d", b.IsEmpty(), b.IsFull(), b.Size()) } @@ -115,7 +115,7 @@ func TestBucketedBufferBasicOperations(t *testing.T) { } func TestBucketedBufferPollBatchAcrossBuckets(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 10, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 10, 0) // Two buckets with different traces b.Offer(tbItem{id: 1, trace: "a"}) b.Offer(tbItem{id: 2, trace: "a"}) @@ -129,7 +129,7 @@ func TestBucketedBufferPollBatchAcrossBuckets(t *testing.T) { } func TestBucketedBufferDrain(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) for i := 1; i <= 5; i++ { b.Offer(tbItem{id: i, trace: "t"}) } @@ -143,7 +143,7 @@ func TestBucketedBufferDrain(t *testing.T) { } func TestBucketedBufferMetrics(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropNewest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropNewest, 1, 0) if b.OfferedCount() != 0 || b.DroppedCount() != 0 { t.Fatalf("initial metrics not zero") } @@ -162,7 +162,7 @@ func TestBucketedBufferMetrics(t *testing.T) { } func TestBucketedBufferOverflowDropNewest(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) b.Offer(tbItem{id: 1}) b.Offer(tbItem{id: 2}) if ok := b.Offer(tbItem{id: 3}); ok { @@ -171,7 +171,7 @@ func TestBucketedBufferOverflowDropNewest(t *testing.T) { } func TestBucketedBufferDroppedCallback(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) calls := 0 b.SetDroppedCallback(func(_ tbItem, reason string) { calls++ @@ -191,7 +191,7 @@ func TestBucketedBufferDroppedCallback(t *testing.T) { } func TestBucketedBufferClear(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) b.Offer(tbItem{id: 1}) b.Offer(tbItem{id: 2}) b.Clear() @@ -219,7 +219,7 @@ func TestBucketedBufferIsReadyToFlush(t *testing.T) { if tt.category == ratelimit.CategoryError { batch = 1 } - b := NewBucketedBuffer[tbItem](tt.category, 10, OverflowPolicyDropOldest, batch, tt.timeout) + b := NewBucketedBuffer[tbItem]("", tt.category, 10, OverflowPolicyDropOldest, batch, tt.timeout) for i := 0; i < tt.items; i++ { b.Offer(tbItem{id: i, trace: "t"}) } @@ -235,7 +235,7 @@ func TestBucketedBufferIsReadyToFlush(t *testing.T) { } func TestBucketedBufferConcurrency(t *testing.T) { - b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 200, OverflowPolicyDropOldest, 1, 0) + b := NewBucketedBuffer[tbItem]("", ratelimit.CategoryError, 200, OverflowPolicyDropOldest, 1, 0) const producers = 5 const per = 50 var wg sync.WaitGroup diff --git a/internal/telemetry/processor_test.go b/internal/telemetry/processor_test.go index 243e09ad0..97c2080ca 100644 --- a/internal/telemetry/processor_test.go +++ b/internal/telemetry/processor_test.go @@ -38,7 +38,7 @@ func TestBuffer_AddAndFlush_Sends(t *testing.T) { dsn := &protocol.Dsn{} sdk := &protocol.SdkInfo{Name: "s", Version: "v"} storage := map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), } b := NewProcessor(storage, transport, dsn, sdk) if !b.Add(bwItem{id: "1"}) { diff --git a/internal/telemetry/ring_buffer.go b/internal/telemetry/ring_buffer.go index 7305d1fc8..2f948efb9 100644 --- a/internal/telemetry/ring_buffer.go +++ b/internal/telemetry/ring_buffer.go @@ -5,7 +5,9 @@ import ( "sync/atomic" "time" + "github.com/getsentry/sentry-go/internal/protocol" "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/report" ) const defaultCapacity = 100 @@ -22,6 +24,7 @@ type RingBuffer[T any] struct { category ratelimit.Category priority ratelimit.Priority overflowPolicy OverflowPolicy + reporter *report.Aggregator batchSize int timeout time.Duration @@ -32,7 +35,7 @@ type RingBuffer[T any] struct { onDropped func(item T, reason string) } -func NewRingBuffer[T any](category ratelimit.Category, capacity int, overflowPolicy OverflowPolicy, batchSize int, timeout time.Duration) *RingBuffer[T] { +func NewRingBuffer[T any](dsn string, category ratelimit.Category, capacity int, overflowPolicy OverflowPolicy, batchSize int, timeout time.Duration) *RingBuffer[T] { if capacity <= 0 { capacity = defaultCapacity } @@ -51,6 +54,7 @@ func NewRingBuffer[T any](category ratelimit.Category, capacity int, overflowPol category: category, priority: category.GetPriority(), overflowPolicy: overflowPolicy, + reporter: report.GetAggregator(dsn), batchSize: batchSize, timeout: timeout, lastFlushTime: time.Now(), @@ -84,6 +88,7 @@ func (b *RingBuffer[T]) Offer(item T) bool { b.tail = (b.tail + 1) % b.capacity atomic.AddInt64(&b.dropped, 1) + b.recordDroppedItem(oldItem) if b.onDropped != nil { b.onDropped(oldItem, "buffer_full_drop_oldest") } @@ -91,6 +96,7 @@ func (b *RingBuffer[T]) Offer(item T) bool { case OverflowPolicyDropNewest: atomic.AddInt64(&b.dropped, 1) + b.recordDroppedItem(item) if b.onDropped != nil { b.onDropped(item, "buffer_full_drop_newest") } @@ -98,6 +104,7 @@ func (b *RingBuffer[T]) Offer(item T) bool { default: atomic.AddInt64(&b.dropped, 1) + b.recordDroppedItem(item) if b.onDropped != nil { b.onDropped(item, "unknown_overflow_policy") } @@ -345,6 +352,14 @@ func (b *RingBuffer[T]) PollIfReady() []T { return result } +func (b *RingBuffer[T]) recordDroppedItem(item T) { + if ti, ok := any(item).(protocol.TelemetryItem); ok { + b.reporter.RecordItem(report.ReasonBufferOverflow, ti) + } else { + b.reporter.RecordOne(report.ReasonBufferOverflow, b.category) + } +} + type BufferMetrics struct { Category ratelimit.Category `json:"category"` Priority ratelimit.Priority `json:"priority"` diff --git a/internal/telemetry/ring_buffer_test.go b/internal/telemetry/ring_buffer_test.go index c8b9af2d8..b7bbfe459 100644 --- a/internal/telemetry/ring_buffer_test.go +++ b/internal/telemetry/ring_buffer_test.go @@ -16,7 +16,7 @@ type testItem struct { func TestNewRingBuffer(t *testing.T) { t.Run("with valid capacity", func(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 50, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 50, OverflowPolicyDropOldest, 1, 0) if buffer.Capacity() != 50 { t.Errorf("Expected capacity 50, got %d", buffer.Capacity()) } @@ -29,14 +29,14 @@ func TestNewRingBuffer(t *testing.T) { }) t.Run("with zero capacity", func(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 1, 0) if buffer.Capacity() != 100 { t.Errorf("Expected default capacity 100, got %d", buffer.Capacity()) } }) t.Run("with negative capacity", func(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryLog, -10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryLog, -10, OverflowPolicyDropOldest, 1, 0) if buffer.Capacity() != 100 { t.Errorf("Expected default capacity 100, got %d", buffer.Capacity()) } @@ -44,7 +44,7 @@ func TestNewRingBuffer(t *testing.T) { } func TestBufferBasicOperations(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Test empty buffer if !buffer.IsEmpty() { @@ -83,7 +83,7 @@ func TestBufferBasicOperations(t *testing.T) { } func TestBufferPollOperation(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Test polling from empty buffer item, ok := buffer.Poll() @@ -126,7 +126,7 @@ func TestBufferPollOperation(t *testing.T) { } func TestBufferOverflow(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Fill buffer to capacity item1 := &testItem{id: 1, data: "first"} @@ -170,7 +170,7 @@ func TestBufferOverflow(t *testing.T) { } func TestBufferDrain(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) // Drain empty buffer items := buffer.Drain() @@ -206,7 +206,7 @@ func TestBufferDrain(t *testing.T) { } func TestBufferMetrics(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Initial metrics if buffer.OfferedCount() != 0 { @@ -230,7 +230,7 @@ func TestBufferMetrics(t *testing.T) { } func TestBufferConcurrency(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 100, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 100, OverflowPolicyDropOldest, 1, 0) const numGoroutines = 10 const itemsPerGoroutine = 50 @@ -301,7 +301,7 @@ func TestBufferDifferentCategories(t *testing.T) { for _, tc := range testCases { t.Run(string(tc.category), func(t *testing.T) { - buffer := NewRingBuffer[*testItem](tc.category, 10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", tc.category, 10, OverflowPolicyDropOldest, 1, 0) if buffer.Category() != tc.category { t.Errorf("Expected category %s, got %s", tc.category, buffer.Category()) } @@ -317,7 +317,7 @@ func TestBufferStressTest(t *testing.T) { t.Skip("Skipping stress test in short mode") } - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 1000, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 1000, OverflowPolicyDropOldest, 1, 0) const duration = 100 * time.Millisecond const numProducers = 5 @@ -394,7 +394,7 @@ func TestBufferStressTest(t *testing.T) { } func TestOverflowPolicyDropOldest(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Fill buffer to capacity item1 := &testItem{id: 1, data: "first"} @@ -434,7 +434,7 @@ func TestOverflowPolicyDropOldest(t *testing.T) { } func TestOverflowPolicyDropNewest(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) // Fill buffer to capacity item1 := &testItem{id: 1, data: "first"} @@ -474,7 +474,7 @@ func TestOverflowPolicyDropNewest(t *testing.T) { } func TestBufferDroppedCallback(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) var droppedItems []*testItem var dropReasons []string @@ -512,7 +512,7 @@ func TestBufferDroppedCallback(t *testing.T) { } func TestBufferPollBatch(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) // Add some items for i := 1; i <= 5; i++ { @@ -540,7 +540,7 @@ func TestBufferPollBatch(t *testing.T) { } func TestBufferPeek(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Test peek on empty buffer _, ok := buffer.Peek() @@ -567,7 +567,7 @@ func TestBufferPeek(t *testing.T) { } func TestBufferAdvancedMetrics(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Test initial metrics metrics := buffer.GetMetrics() @@ -611,7 +611,7 @@ func TestBufferAdvancedMetrics(t *testing.T) { } func TestBufferClear(t *testing.T) { - buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem]("", ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Add some items buffer.Offer(&testItem{id: 1, data: "test"}) @@ -700,7 +700,7 @@ func TestBufferIsReadyToFlush(t *testing.T) { batchSize = 100 timeout = 5 * time.Second } - buffer := NewRingBuffer[*testItem](tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) + buffer := NewRingBuffer[*testItem]("", tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) for i := 0; i < tt.itemsToAdd; i++ { buffer.Offer(&testItem{id: i, data: "test"}) @@ -771,7 +771,7 @@ func TestBufferPollIfReady(t *testing.T) { batchSize = 100 timeout = 5 * time.Second } - buffer := NewRingBuffer[*testItem](tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) + buffer := NewRingBuffer[*testItem]("", tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) for i := 0; i < tt.itemsToAdd; i++ { buffer.Offer(&testItem{id: i, data: "test"}) diff --git a/internal/telemetry/scheduler.go b/internal/telemetry/scheduler.go index 5bf206a54..8e7a03070 100644 --- a/internal/telemetry/scheduler.go +++ b/internal/telemetry/scheduler.go @@ -8,6 +8,11 @@ import ( "github.com/getsentry/sentry-go/internal/debuglog" "github.com/getsentry/sentry-go/internal/protocol" "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/report" +) + +const ( + defaultClientReportsTick = time.Second * 30 ) // Scheduler implements a weighted round-robin scheduler for processing buffered events. @@ -16,6 +21,7 @@ type Scheduler struct { transport protocol.TelemetryTransport dsn *protocol.Dsn sdkInfo *protocol.SdkInfo + reporter *report.Aggregator currentCycle []ratelimit.Priority cyclePos int @@ -68,6 +74,7 @@ func NewScheduler( transport: transport, dsn: dsn, sdkInfo: sdkInfo, + reporter: report.GetAggregator(dsn.String()), currentCycle: currentCycle, ctx: ctx, cancel: cancel, @@ -141,10 +148,31 @@ func (s *Scheduler) run() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() + clientReportsTicker := time.NewTicker(defaultClientReportsTick) + defer clientReportsTicker.Stop() for { select { case <-ticker.C: s.cond.Broadcast() + case <-clientReportsTicker.C: + r := s.reporter.TakeReport() + if r != nil { + header := &protocol.EnvelopeHeader{EventID: protocol.GenerateEventID(), SentAt: time.Now(), Sdk: s.sdkInfo} + if s.dsn != nil { + header.Dsn = s.dsn.String() + } + envelope := protocol.NewEnvelope(header) + item, err := r.ToEnvelopeItem() + if err != nil { + debuglog.Printf("error sending client report: %v", err) + continue + } + envelope.AddItem(item) + if err := s.transport.SendEnvelope(envelope); err != nil { + debuglog.Printf("error sending envelope: %v", err) + continue + } + } case <-s.ctx.Done(): return } @@ -209,8 +237,20 @@ func (s *Scheduler) processItems(buffer Buffer[protocol.TelemetryItem], category items = buffer.PollIfReady() } - // drop the current batch if rate-limited or if transport is full - if len(items) == 0 || s.isRateLimited(category) || !s.transport.HasCapacity() { + if len(items) == 0 { + return + } + + if s.isRateLimited(category) { + for _, item := range items { + s.reporter.RecordItem(report.ReasonRateLimitBackoff, item) + } + return + } + if !s.transport.HasCapacity() { + for _, item := range items { + s.reporter.RecordItem(report.ReasonQueueOverflow, item) + } return } @@ -228,6 +268,7 @@ func (s *Scheduler) processItems(buffer Buffer[protocol.TelemetryItem], category return } envelope.AddItem(item) + s.reporter.AttachToEnvelope(envelope) if err := s.transport.SendEnvelope(envelope); err != nil { debuglog.Printf("error sending envelope: %v", err) } @@ -245,6 +286,7 @@ func (s *Scheduler) processItems(buffer Buffer[protocol.TelemetryItem], category return } envelope.AddItem(item) + s.reporter.AttachToEnvelope(envelope) if err := s.transport.SendEnvelope(envelope); err != nil { debuglog.Printf("error sending envelope: %v", err) } @@ -283,6 +325,7 @@ func (s *Scheduler) sendItem(item protocol.EnvelopeItemConvertible) { return } envelope.AddItem(envItem) + s.reporter.AttachToEnvelope(envelope) if err := s.transport.SendEnvelope(envelope); err != nil { debuglog.Printf("error sending envelope: %v", err) } diff --git a/internal/telemetry/scheduler_test.go b/internal/telemetry/scheduler_test.go index e392e97fe..e66718811 100644 --- a/internal/telemetry/scheduler_test.go +++ b/internal/telemetry/scheduler_test.go @@ -52,7 +52,7 @@ func TestNewTelemetryScheduler(t *testing.T) { dsn := &protocol.Dsn{} buffers := map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), } sdkInfo := &protocol.SdkInfo{ @@ -105,7 +105,7 @@ func TestTelemetrySchedulerFlush(t *testing.T) { name: "single category with multiple items", setupBuffers: func() map[ratelimit.Category]Buffer[protocol.TelemetryItem] { return map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), } }, addItems: func(buffers map[ratelimit.Category]Buffer[protocol.TelemetryItem]) { @@ -119,7 +119,7 @@ func TestTelemetrySchedulerFlush(t *testing.T) { name: "empty buffers", setupBuffers: func() map[ratelimit.Category]Buffer[protocol.TelemetryItem] { return map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), } }, addItems: func(_ map[ratelimit.Category]Buffer[protocol.TelemetryItem]) { @@ -130,9 +130,9 @@ func TestTelemetrySchedulerFlush(t *testing.T) { name: "multiple categories", setupBuffers: func() map[ratelimit.Category]Buffer[protocol.TelemetryItem] { return map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryTransaction: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTransaction, 10, OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryMonitor: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryMonitor, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryTransaction: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryTransaction, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryMonitor: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryMonitor, 10, OverflowPolicyDropOldest, 1, 0), } }, addItems: func(buffers map[ratelimit.Category]Buffer[protocol.TelemetryItem]) { @@ -148,8 +148,8 @@ func TestTelemetrySchedulerFlush(t *testing.T) { name: "priority ordering - error and log", setupBuffers: func() map[ratelimit.Category]Buffer[protocol.TelemetryItem] { return map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryLog: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 100, 5*time.Second), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryLog: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 100, 5*time.Second), } }, addItems: func(buffers map[ratelimit.Category]Buffer[protocol.TelemetryItem]) { @@ -163,8 +163,8 @@ func TestTelemetrySchedulerFlush(t *testing.T) { name: "priority ordering - error and metric", setupBuffers: func() map[ratelimit.Category]Buffer[protocol.TelemetryItem] { return map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ - ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), - ratelimit.CategoryTraceMetric: NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTraceMetric, 10, OverflowPolicyDropOldest, 100, 5*time.Second), + ratelimit.CategoryError: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryTraceMetric: NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryTraceMetric, 10, OverflowPolicyDropOldest, 100, 5*time.Second), } }, addItems: func(buffers map[ratelimit.Category]Buffer[protocol.TelemetryItem]) { @@ -206,7 +206,7 @@ func TestTelemetrySchedulerRateLimiting(t *testing.T) { transport := &testutils.MockTelemetryTransport{} dsn := &protocol.Dsn{} - buffer := NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) buffers := map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ ratelimit.CategoryError: buffer, } @@ -239,7 +239,7 @@ func TestTelemetrySchedulerStartStop(t *testing.T) { transport := &testutils.MockTelemetryTransport{} dsn := &protocol.Dsn{} - buffer := NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) buffers := map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ ratelimit.CategoryError: buffer, } @@ -267,7 +267,7 @@ func TestTelemetrySchedulerContextCancellation(t *testing.T) { transport := &testutils.MockTelemetryTransport{} dsn := &protocol.Dsn{} - buffer := NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[protocol.TelemetryItem]("", ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) buffers := map[ratelimit.Category]Buffer[protocol.TelemetryItem]{ ratelimit.CategoryError: buffer, } diff --git a/log.go b/log.go index 4cd6174cc..73e7a97a9 100644 --- a/log.go +++ b/log.go @@ -165,6 +165,7 @@ func (l *sentryLogger) log(ctx context.Context, level LogLevel, severity int, me Body: fmt.Sprintf(message, args...), Attributes: attrs, } + log.approximateSize = computeLogSize(log) client.captureLog(log, scope) if client.options.Debug { diff --git a/log_test.go b/log_test.go index 2713ce441..8866a834d 100644 --- a/log_test.go +++ b/log_test.go @@ -194,6 +194,7 @@ func Test_sentryLogger_MethodsWithFormat(t *testing.T) { opts := cmp.Options{ cmpopts.IgnoreFields(Log{}, "Timestamp"), + cmpopts.IgnoreFields(Log{}, "approximateSize"), } gotEvents := mockTransport.Events() @@ -338,6 +339,7 @@ func Test_sentryLogger_MethodsWithoutFormat(t *testing.T) { opts := cmp.Options{ cmpopts.IgnoreFields(Log{}, "Timestamp"), + cmpopts.IgnoreFields(Log{}, "approximateSize"), } gotEvents := mockTransport.Events() @@ -424,6 +426,7 @@ func Test_sentryLogger_Write(t *testing.T) { opts := cmp.Options{ cmpopts.IgnoreFields(Log{}, "Timestamp"), + cmpopts.IgnoreFields(Log{}, "approximateSize"), } if diff := cmp.Diff(wantLogs, event.Logs, opts); diff != "" { t.Errorf("Logs mismatch (-want +got):\n%s", diff) diff --git a/report/aggregator.go b/report/aggregator.go new file mode 100644 index 000000000..16f18d443 --- /dev/null +++ b/report/aggregator.go @@ -0,0 +1,169 @@ +package report + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/getsentry/sentry-go/internal/debuglog" + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +// Aggregator collects discarded event outcomes for client reports. +// Uses atomic operations to be safe for concurrent use. +type Aggregator struct { + mu sync.Mutex + outcomes map[OutcomeKey]*atomic.Int64 +} + +// NewAggregator creates a new client report Aggregator. +func NewAggregator() *Aggregator { + a := &Aggregator{ + outcomes: make(map[OutcomeKey]*atomic.Int64), + } + return a +} + +// Record records a discarded event outcome. +func (a *Aggregator) Record(reason DiscardReason, category ratelimit.Category, quantity int64) { + if a == nil || quantity <= 0 { + return + } + + key := OutcomeKey{Reason: reason, Category: category} + + a.mu.Lock() + defer a.mu.Unlock() + counter, exists := a.outcomes[key] + if !exists { + counter = &atomic.Int64{} + a.outcomes[key] = counter + } + counter.Add(quantity) +} + +// RecordOne is a helper method to record one discarded event outcome. +func (a *Aggregator) RecordOne(reason DiscardReason, category ratelimit.Category) { + a.Record(reason, category, 1) +} + +// TakeReport atomically takes all accumulated outcomes and returns a ClientReport. +func (a *Aggregator) TakeReport() *ClientReport { + if a == nil { + return nil + } + a.mu.Lock() + defer a.mu.Unlock() + + if len(a.outcomes) == 0 { + return nil + } + + var events []DiscardedEvent + for key, counter := range a.outcomes { + quantity := counter.Swap(0) + if quantity > 0 { + events = append(events, DiscardedEvent{ + Reason: key.Reason, + Category: key.Category, + Quantity: quantity, + }) + } + } + + // Clear empty counters to prevent unbounded growth + for key, counter := range a.outcomes { + if counter.Load() == 0 { + delete(a.outcomes, key) + } + } + + if len(events) == 0 { + return nil + } + + return &ClientReport{ + Timestamp: time.Now(), + DiscardedEvents: events, + } +} + +// RecordForEnvelope records client report outcomes for all items in the envelope. +// It inspects envelope item headers to derive categories, span counts, and log byte sizes. +func (a *Aggregator) RecordForEnvelope(reason DiscardReason, envelope *protocol.Envelope) { + if a == nil { + return + } + + for _, item := range envelope.Items { + if item == nil || item.Header == nil { + continue + } + switch item.Header.Type { + case protocol.EnvelopeItemTypeEvent: + a.RecordOne(reason, ratelimit.CategoryError) + case protocol.EnvelopeItemTypeTransaction: + a.RecordOne(reason, ratelimit.CategoryTransaction) + spanCount := int64(item.Header.SpanCount) + a.Record(reason, ratelimit.CategorySpan, spanCount) + case protocol.EnvelopeItemTypeLog: + if item.Header.ItemCount != nil { + a.Record(reason, ratelimit.CategoryLog, int64(*item.Header.ItemCount)) + } + if item.Header.Length != nil { + a.Record(reason, ratelimit.CategoryLogByte, int64(*item.Header.Length)) + } + case protocol.EnvelopeItemTypeTraceMetric: + a.RecordOne(reason, ratelimit.CategoryTraceMetric) + case protocol.EnvelopeItemTypeCheckIn: + a.RecordOne(reason, ratelimit.CategoryMonitor) + case protocol.EnvelopeItemTypeAttachment, protocol.EnvelopeItemTypeClientReport: + // Skip — not reportable categories + } + } +} + +// RecordItem records outcomes for a telemetry item, including supplementary +// categories (span outcomes for transactions, byte size for logs). +func (a *Aggregator) RecordItem(reason DiscardReason, item protocol.TelemetryItem) { + category := item.GetCategory() + a.RecordOne(reason, category) + + // Span outcomes for transactions + if category == ratelimit.CategoryTransaction { + type spanCounter interface{ GetSpanCount() int } + if sc, ok := item.(spanCounter); ok { + if count := sc.GetSpanCount(); count > 0 { + a.Record(reason, ratelimit.CategorySpan, int64(count)) + } + } + } + + // Byte size outcomes for logs + if category == ratelimit.CategoryLog { + type sizer interface{ ApproximateSize() int } + if s, ok := item.(sizer); ok { + if size := s.ApproximateSize(); size > 0 { + a.Record(reason, ratelimit.CategoryLogByte, int64(size)) + } + } + } +} + +// AttachToEnvelope adds a client report to the envelope if the Aggregator has outcomes available. +func (a *Aggregator) AttachToEnvelope(envelope *protocol.Envelope) { + if a == nil { + return + } + + r := a.TakeReport() + if r != nil { + rItem, err := r.ToEnvelopeItem() + if err == nil { + envelope.AddItem(rItem) + } else { + debuglog.Printf("failed to serialize client report: %v, with err: %v", r, err) + } + } +} diff --git a/report/outcome.go b/report/outcome.go new file mode 100644 index 000000000..63d6eaf26 --- /dev/null +++ b/report/outcome.go @@ -0,0 +1,18 @@ +package report + +import ( + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +// OutcomeKey uniquely identifies an outcome bucket for aggregation. +type OutcomeKey struct { + Reason DiscardReason + Category ratelimit.Category +} + +// DiscardedEvent represents a single discard event outcome for the OutcomeKey. +type DiscardedEvent struct { + Reason DiscardReason `json:"reason"` + Category ratelimit.Category `json:"category"` + Quantity int64 `json:"quantity"` +} diff --git a/report/reason.go b/report/reason.go new file mode 100644 index 000000000..5a1d21835 --- /dev/null +++ b/report/reason.go @@ -0,0 +1,38 @@ +package report + +// DiscardReason represents why an item was discarded. +type DiscardReason string + +const ( + // ReasonQueueOverflow indicates the transport queue was full. + ReasonQueueOverflow DiscardReason = "queue_overflow" + + // ReasonBufferOverflow indicates that an internal buffer was full. + ReasonBufferOverflow DiscardReason = "buffer_overflow" + + // ReasonRateLimitBackoff indicates the item was dropped due to rate limiting. + ReasonRateLimitBackoff DiscardReason = "ratelimit_backoff" + + // ReasonBeforeSend indicates the item was dropped due to a BeforeSend callback. + ReasonBeforeSend DiscardReason = "before_send" + + // ReasonEventProcessor indicates the item was dropped due to an event processor callback. + ReasonEventProcessor DiscardReason = "event_processor" + + // ReasonSampleRate indicates the item was dropped due to sampling. + ReasonSampleRate DiscardReason = "sample_rate" + + // ReasonNetworkError indicates an HTTP request failed (connection error). + ReasonNetworkError DiscardReason = "network_error" + + // ReasonSendError indicates HTTP returned an error status (4xx, 5xx). + // + // The party that drops an envelope is responsible for counting the event (we skip outcomes that the server records + // `http.StatusTooManyRequests` 429). However, relay does not always record an outcome for oversized envelopes, so + // we accept the trade-off of double counting `http.StatusRequestEntityTooLarge` (413) codes, to record all events. + // For more details https://develop.sentry.dev/sdk/expected-features/#dealing-with-network-failures + ReasonSendError DiscardReason = "send_error" + + // ReasonInternalError indicates an internal SDK error. + ReasonInternalError DiscardReason = "internal_sdk_error" +) diff --git a/report/registry.go b/report/registry.go new file mode 100644 index 000000000..6ab85d7fd --- /dev/null +++ b/report/registry.go @@ -0,0 +1,76 @@ +package report + +import ( + "sync" +) + +// registry is a global map from DSN string to its associated Aggregator. +// The Client should be the only component creating aggregators. Other components +// (transports, telemetry buffers) should only fetch existing aggregators. +var registry struct { + mu sync.RWMutex + aggregators map[string]*Aggregator +} + +// nolint:gochecknoinits +func init() { + registry.aggregators = make(map[string]*Aggregator) +} + +// GetAggregator returns the existing Aggregator for a DSN, or nil if none exists. +func GetAggregator(dsn string) *Aggregator { + if dsn == "" { + return nil + } + + registry.mu.RLock() + defer registry.mu.RUnlock() + return registry.aggregators[dsn] +} + +// GetOrCreateAggregator returns the existing Aggregator for a DSN, or creates a new one if none exists. +// +// Since the client is the source of truth for client reports, it should be the only one that calls this method. +// Other components should use GetAggregator, without registering a new one. +func GetOrCreateAggregator(dsn string) *Aggregator { + if dsn == "" { + return nil + } + + registry.mu.RLock() + if agg, exists := registry.aggregators[dsn]; exists { + registry.mu.RUnlock() + return agg + } + registry.mu.RUnlock() + + registry.mu.Lock() + defer registry.mu.Unlock() + + // Double-check after acquiring write lock + if agg, exists := registry.aggregators[dsn]; exists { + return agg + } + + agg := NewAggregator() + registry.aggregators[dsn] = agg + return agg +} + +// UnregisterAggregator removes the Aggregator for a DSN from the registry. +func UnregisterAggregator(dsn string) { + if dsn == "" { + return + } + + registry.mu.Lock() + defer registry.mu.Unlock() + delete(registry.aggregators, dsn) +} + +// ClearRegistry removes all registered aggregators. +func ClearRegistry() { + registry.mu.Lock() + defer registry.mu.Unlock() + registry.aggregators = make(map[string]*Aggregator) +} diff --git a/report/registry_test.go b/report/registry_test.go new file mode 100644 index 000000000..bdfc89b39 --- /dev/null +++ b/report/registry_test.go @@ -0,0 +1,82 @@ +package report + +import ( + "testing" +) + +func TestRegistry_SharedAcrossComponents(t *testing.T) { + ClearRegistry() + defer ClearRegistry() + + dsn := "https://public@example.com/1" + + clientAgg := GetOrCreateAggregator(dsn) + transportAgg := GetOrCreateAggregator(dsn) + telemetryAgg := GetOrCreateAggregator(dsn) + + if clientAgg != transportAgg { + t.Errorf("client and transport should share aggregator") + } + if clientAgg != telemetryAgg { + t.Errorf("client and telemetry should share aggregator") + } + + clientAgg.RecordOne(ReasonQueueOverflow, "error") + transportAgg.RecordOne(ReasonRateLimitBackoff, "transaction") + + report := telemetryAgg.TakeReport() + if report == nil { + t.Fatal("expected report from shared aggregator") + } + + if len(report.DiscardedEvents) != 2 { + t.Errorf("expected 2 discarded events, got %d", len(report.DiscardedEvents)) + } +} + +func TestUnregisterAggregator(t *testing.T) { + ClearRegistry() + defer ClearRegistry() + + dsn := "https://public@example.com/1" + agg1 := GetOrCreateAggregator(dsn) + if agg1 == nil { + t.Fatal("expected aggregator, got nil") + } + + UnregisterAggregator(dsn) + + agg2 := GetOrCreateAggregator(dsn) + if agg2 == nil { + t.Fatal("expected new aggregator after unregister, got nil") + } + if agg1 == agg2 { + t.Errorf("expected different aggregator instance after unregister") + } + + UnregisterAggregator("") +} + +func TestClearRegistry(t *testing.T) { + ClearRegistry() + defer ClearRegistry() + + dsn1 := "https://public@example.com/1" + dsn2 := "https://public@example.com/2" + + agg1 := GetOrCreateAggregator(dsn1) + agg2 := GetOrCreateAggregator(dsn2) + + ClearRegistry() + + // After clear, should create new instances + newAgg1 := GetOrCreateAggregator(dsn1) + newAgg2 := GetOrCreateAggregator(dsn2) + + if agg1 == newAgg1 { + t.Errorf("expected different aggregator for dsn1 after clear") + } + if agg2 == newAgg2 { + t.Errorf("expected different aggregator for dsn2 after clear") + } +} diff --git a/report/report.go b/report/report.go new file mode 100644 index 000000000..9d1e720c6 --- /dev/null +++ b/report/report.go @@ -0,0 +1,23 @@ +package report + +import ( + "encoding/json" + "time" + + "github.com/getsentry/sentry-go/internal/protocol" +) + +// ClientReport is the payload sent to Sentry for tracking discarded events. +type ClientReport struct { + Timestamp time.Time `json:"timestamp"` + DiscardedEvents []DiscardedEvent `json:"discarded_events"` +} + +// ToEnvelopeItem converts the ClientReport to an envelope item. +func (r *ClientReport) ToEnvelopeItem() (*protocol.EnvelopeItem, error) { + payload, err := json.Marshal(r) + if err != nil { + return nil, err + } + return protocol.NewClientReportItem(payload), nil +} diff --git a/scope.go b/scope.go index 72a413365..244703d94 100644 --- a/scope.go +++ b/scope.go @@ -9,6 +9,8 @@ import ( "time" "github.com/getsentry/sentry-go/internal/debuglog" + "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/report" ) // Scope holds contextual data for the current scope. @@ -366,7 +368,7 @@ func (scope *Scope) AddEventProcessor(processor EventProcessor) { } // ApplyToEvent takes the data from the current scope and attaches it to the event. -func (scope *Scope) ApplyToEvent(event *Event, hint *EventHint, client *Client) *Event { +func (scope *Scope) ApplyToEvent(event *Event, hint *EventHint, client *Client) *Event { //nolint:gocyclo scope.mu.RLock() defer scope.mu.RUnlock() @@ -473,11 +475,24 @@ func (scope *Scope) ApplyToEvent(event *Event, hint *EventHint, client *Client) for _, processor := range scope.eventProcessors { id := event.EventID + category := event.toCategory() + spanCountBefore := event.GetSpanCount() event = processor(event, hint) if event == nil { debuglog.Printf("Event dropped by one of the Scope EventProcessors: %s\n", id) + if client != nil { + client.reporter.RecordOne(report.ReasonEventProcessor, category) + if category == ratelimit.CategoryTransaction { + client.reporter.Record(report.ReasonEventProcessor, ratelimit.CategorySpan, int64(spanCountBefore)) + } + } return nil } + if droppedSpans := spanCountBefore - event.GetSpanCount(); droppedSpans > 0 { + if client != nil { + client.reporter.Record(report.ReasonEventProcessor, ratelimit.CategorySpan, int64(droppedSpans)) + } + } } return event diff --git a/tracing.go b/tracing.go index 9c9a24a43..12166336e 100644 --- a/tracing.go +++ b/tracing.go @@ -14,6 +14,8 @@ import ( "time" "github.com/getsentry/sentry-go/internal/debuglog" + "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/report" ) const ( @@ -429,6 +431,17 @@ func (s *Span) doFinish() { } if !s.Sampled.Bool() { + c := hub.Client() + if c != nil { + if !s.IsTransaction() { + // we count the sampled spans from the transaction root. it is guaranteed that the whole transaction + // would be sampled + return + } + children := s.recorder.children() + c.reporter.RecordOne(report.ReasonSampleRate, ratelimit.CategoryTransaction) + c.reporter.Record(report.ReasonSampleRate, ratelimit.CategorySpan, int64(len(children)+1)) + } return } event := s.toEvent() diff --git a/transport.go b/transport.go index 1b5657502..9d91ba3a5 100644 --- a/transport.go +++ b/transport.go @@ -18,11 +18,13 @@ import ( "github.com/getsentry/sentry-go/internal/protocol" "github.com/getsentry/sentry-go/internal/ratelimit" "github.com/getsentry/sentry-go/internal/util" + "github.com/getsentry/sentry-go/report" ) const ( - defaultBufferSize = 1000 - defaultTimeout = time.Second * 30 + defaultBufferSize = 1000 + defaultTimeout = time.Second * 30 + defaultClientReportsTick = time.Second * 30 ) // Transport is used by the Client to deliver events to remote server. @@ -124,6 +126,19 @@ func encodeAttachment(enc *json.Encoder, b io.Writer, attachment *Attachment) er return nil } +func encodeClientReport(enc *json.Encoder, cr *report.ClientReport) error { + payload, err := json.Marshal(cr) + if err != nil { + return err + } + err = encodeEnvelopeItem(enc, string(protocol.EnvelopeItemTypeClientReport), payload) + if err != nil { + return err + } + + return nil +} + func encodeEnvelopeItem(enc *json.Encoder, itemType string, body json.RawMessage) error { // Item header err := enc.Encode(struct { @@ -174,7 +189,26 @@ func encodeEnvelopeMetrics(enc *json.Encoder, count int, body json.RawMessage) e return err } -func envelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMessage) (*bytes.Buffer, error) { +func recordSpanOutcome(reporter *report.Aggregator, reason report.DiscardReason, event *Event) { + if event.Type == transactionType { + reporter.Record(reason, ratelimit.CategorySpan, int64(event.GetSpanCount())) + } +} + +// envelopeHeader represents the header of a Sentry envelope. +type envelopeHeader struct { + EventID EventID `json:"event_id,omitempty"` + SentAt time.Time `json:"sent_at"` + Dsn string `json:"dsn,omitempty"` + Sdk map[string]string `json:"sdk,omitempty"` + Trace map[string]string `json:"trace,omitempty"` +} + +func encodeEnvelopeHeader(enc *json.Encoder, header *envelopeHeader) error { + return enc.Encode(header) +} + +func envelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMessage, reporter *report.Aggregator) (*bytes.Buffer, error) { var b bytes.Buffer enc := json.NewEncoder(&b) @@ -187,13 +221,7 @@ func envelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMes } // Envelope header - err := enc.Encode(struct { - EventID EventID `json:"event_id"` - SentAt time.Time `json:"sent_at"` - Dsn string `json:"dsn"` - Sdk map[string]string `json:"sdk"` - Trace map[string]string `json:"trace,omitempty"` - }{ + err := encodeEnvelopeHeader(enc, &envelopeHeader{ EventID: event.EventID, SentAt: sentAt, Trace: trace, @@ -229,49 +257,63 @@ func envelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMes } } + // attach client report if exists + r := reporter.TakeReport() + if r != nil { + if err := encodeClientReport(enc, r); err != nil { + return nil, err + } + } return &b, nil } -func getRequestFromEvent(ctx context.Context, event *Event, dsn *Dsn) (r *http.Request, err error) { - defer func() { - if r != nil { - r.Header.Set("User-Agent", fmt.Sprintf("%s/%s", event.Sdk.Name, event.Sdk.Version)) - r.Header.Set("Content-Type", "application/x-sentry-envelope") +// getRequestFromEnvelope creates an HTTP request from a pre-built envelope. +// sdkName and sdkVersion are used for User-Agent and authentication headers. +func getRequestFromEnvelope(ctx context.Context, dsn *Dsn, envelope *bytes.Buffer, sdkName, sdkVersion string) (*http.Request, error) { + if ctx == nil { + ctx = context.Background() + } - auth := fmt.Sprintf("Sentry sentry_version=%s, "+ - "sentry_client=%s/%s, sentry_key=%s", apiVersion, event.Sdk.Name, event.Sdk.Version, dsn.GetPublicKey()) + request, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + dsn.GetAPIURL().String(), + envelope, + ) + if err != nil { + return nil, err + } - // The key sentry_secret is effectively deprecated and no longer needs to be set. - // However, since it was required in older self-hosted versions, - // it should still passed through to Sentry if set. - if dsn.GetSecretKey() != "" { - auth = fmt.Sprintf("%s, sentry_secret=%s", auth, dsn.GetSecretKey()) - } + request.Header.Set("User-Agent", fmt.Sprintf("%s/%s", sdkName, sdkVersion)) + request.Header.Set("Content-Type", "application/x-sentry-envelope") - r.Header.Set("X-Sentry-Auth", auth) - } - }() + auth := fmt.Sprintf("Sentry sentry_version=%s, "+ + "sentry_client=%s/%s, sentry_key=%s", apiVersion, sdkName, sdkVersion, dsn.GetPublicKey()) + // The key sentry_secret is effectively deprecated and no longer needs to be set. + // However, since it was required in older self-hosted versions, + // it should still be passed through to Sentry if set. + if dsn.GetSecretKey() != "" { + auth = fmt.Sprintf("%s, sentry_secret=%s", auth, dsn.GetSecretKey()) + } + + request.Header.Set("X-Sentry-Auth", auth) + + return request, nil +} + +func getRequestFromEvent(ctx context.Context, event *Event, dsn *Dsn, reporter *report.Aggregator) (*http.Request, error) { body := getRequestBodyFromEvent(event) if body == nil { return nil, errors.New("event could not be marshaled") } - envelope, err := envelopeFromBody(event, dsn, time.Now(), body) + envelope, err := envelopeFromBody(event, dsn, time.Now(), body, reporter) if err != nil { return nil, err } - if ctx == nil { - ctx = context.Background() - } - - return http.NewRequestWithContext( - ctx, - http.MethodPost, - dsn.GetAPIURL().String(), - envelope, - ) + return getRequestFromEnvelope(ctx, dsn, envelope, event.Sdk.Name, event.Sdk.Version) } // ================================ @@ -300,6 +342,7 @@ type HTTPTransport struct { dsn *Dsn client *http.Client transport http.RoundTripper + reporter *report.Aggregator // buffer is a channel of batches. Calling Flush terminates work on the // current in-flight items and starts a new batch for subsequent events. @@ -330,7 +373,7 @@ func NewHTTPTransport() *HTTPTransport { return &transport } -// Configure is called by the Client itself, providing it it's own ClientOptions. +// Configure is called by the Client itself, providing its own ClientOptions. func (t *HTTPTransport) Configure(options ClientOptions) { dsn, err := NewDsn(options.Dsn) if err != nil { @@ -338,6 +381,9 @@ func (t *HTTPTransport) Configure(options ClientOptions) { return } t.dsn = dsn + if !options.DisableClientReports { + t.reporter = report.GetAggregator(options.Dsn) + } // A buffered channel with capacity 1 works like a mutex, ensuring only one // goroutine can access the current batch at a given time. Access is @@ -386,11 +432,15 @@ func (t *HTTPTransport) SendEventWithContext(ctx context.Context, event *Event) category := event.toCategory() if t.disabled(category) { + t.reporter.RecordOne(report.ReasonRateLimitBackoff, category) + recordSpanOutcome(t.reporter, report.ReasonRateLimitBackoff, event) return } - request, err := getRequestFromEvent(ctx, event, t.dsn) + request, err := getRequestFromEvent(ctx, event, t.dsn, t.reporter) if err != nil { + t.reporter.RecordOne(report.ReasonInternalError, category) + recordSpanOutcome(t.reporter, report.ReasonInternalError, event) return } @@ -423,6 +473,8 @@ func (t *HTTPTransport) SendEventWithContext(ctx context.Context, event *Event) ) default: debuglog.Println("Event dropped due to transport buffer being full.") + t.reporter.RecordOne(report.ReasonQueueOverflow, category) + recordSpanOutcome(t.reporter, report.ReasonQueueOverflow, event) } t.buffer <- b @@ -509,6 +561,8 @@ func (t *HTTPTransport) Close() { } func (t *HTTPTransport) worker() { + crTicker := time.NewTicker(defaultClientReportsTick) + defer crTicker.Stop() for b := range t.buffer { // Signal that processing of the current batch has started. close(b.started) @@ -523,6 +577,46 @@ func (t *HTTPTransport) worker() { select { case <-t.done: return + case <-crTicker.C: + r := t.reporter.TakeReport() + if r != nil { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + if err := encodeEnvelopeHeader(enc, &envelopeHeader{ + SentAt: time.Now(), + Dsn: t.dsn.String(), + Sdk: map[string]string{ + "name": sdkIdentifier, + "version": SDKVersion, + }, + }); err != nil { + continue + } + if err := encodeClientReport(enc, r); err != nil { + continue + } + req, err := getRequestFromEnvelope(context.Background(), t.dsn, &buf, sdkIdentifier, SDKVersion) + if err != nil { + debuglog.Printf("There was an issue when creating the request: %v", err) + continue + } + response, err := t.client.Do(req) + if err != nil { + debuglog.Printf("There was an issue with sending an event: %v", err) + continue + } + t.mu.Lock() + if t.limits == nil { + t.limits = make(ratelimit.Map) + } + t.limits.Merge(ratelimit.FromResponse(response)) + t.mu.Unlock() + + // Drain body up to a limit and close it, allowing the + // transport to reuse TCP connections. + _, _ = io.CopyN(io.Discard, response.Body, util.MaxDrainResponseBytes) + response.Body.Close() + } case item, open := <-b.items: if !open { break loop @@ -586,6 +680,7 @@ type HTTPSyncTransport struct { dsn *Dsn client *http.Client transport http.RoundTripper + reporter *report.Aggregator mu sync.Mutex limits ratelimit.Map @@ -604,7 +699,7 @@ func NewHTTPSyncTransport() *HTTPSyncTransport { return &transport } -// Configure is called by the Client itself, providing it it's own ClientOptions. +// Configure is called by the Client itself, providing its own ClientOptions. func (t *HTTPSyncTransport) Configure(options ClientOptions) { dsn, err := NewDsn(options.Dsn) if err != nil { @@ -612,6 +707,9 @@ func (t *HTTPSyncTransport) Configure(options ClientOptions) { return } t.dsn = dsn + if !options.DisableClientReports { + t.reporter = report.GetAggregator(options.Dsn) + } if options.HTTPTransport != nil { t.transport = options.HTTPTransport @@ -645,12 +743,17 @@ func (t *HTTPSyncTransport) SendEventWithContext(ctx context.Context, event *Eve return } - if t.disabled(event.toCategory()) { + category := event.toCategory() + if t.disabled(category) { + t.reporter.RecordOne(report.ReasonRateLimitBackoff, category) + recordSpanOutcome(t.reporter, report.ReasonRateLimitBackoff, event) return } - request, err := getRequestFromEvent(ctx, event, t.dsn) + request, err := getRequestFromEvent(ctx, event, t.dsn, t.reporter) if err != nil { + t.reporter.RecordOne(report.ReasonInternalError, category) + recordSpanOutcome(t.reporter, report.ReasonInternalError, event) return } @@ -665,9 +768,15 @@ func (t *HTTPSyncTransport) SendEventWithContext(ctx context.Context, event *Eve response, err := t.client.Do(request) if err != nil { debuglog.Printf("There was an issue with sending an event: %v", err) + t.reporter.RecordOne(report.ReasonNetworkError, category) + recordSpanOutcome(t.reporter, report.ReasonNetworkError, event) return } - util.HandleHTTPResponse(response, identifier) + success := util.HandleHTTPResponse(response, identifier) + if !success && response.StatusCode != http.StatusTooManyRequests { + t.reporter.RecordOne(report.ReasonSendError, category) + recordSpanOutcome(t.reporter, report.ReasonSendError, event) + } t.mu.Lock() if t.limits == nil { diff --git a/transport_test.go b/transport_test.go index 866f6d9ba..adeae65e6 100644 --- a/transport_test.go +++ b/transport_test.go @@ -162,7 +162,7 @@ func TestEnvelopeFromErrorBody(t *testing.T) { body := json.RawMessage(`{"type":"event","fields":"omitted"}`) - b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body) + b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body, nil) if err != nil { t.Fatal(err) } @@ -182,7 +182,7 @@ func TestEnvelopeFromTransactionBody(t *testing.T) { body := json.RawMessage(`{"type":"transaction","fields":"omitted"}`) - b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body) + b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body, nil) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func TestEnvelopeFromEventWithAttachments(t *testing.T) { body := json.RawMessage(`{"type":"event","fields":"omitted"}`) - b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body) + b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body, nil) if err != nil { t.Fatal(err) } @@ -248,7 +248,7 @@ func TestEnvelopeFromCheckInEvent(t *testing.T) { sentAt := time.Unix(0, 0).UTC() body := getRequestBodyFromEvent(event) - b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body) + b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body, nil) if err != nil { t.Fatal(err) } @@ -286,7 +286,7 @@ func TestEnvelopeFromLogEvent(t *testing.T) { sentAt := time.Unix(0, 0).UTC() body := getRequestBodyFromEvent(event) - b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body) + b, err := envelopeFromBody(event, newTestDSN(t), sentAt, body, nil) if err != nil { t.Fatal(err) } @@ -328,7 +328,7 @@ func TestGetRequestFromEvent(t *testing.T) { } t.Run(test.testName, func(t *testing.T) { - req, err := getRequestFromEvent(context.TODO(), test.event, dsn) + req, err := getRequestFromEvent(context.TODO(), test.event, dsn, nil) if err != nil { t.Fatal(err) }