-
Notifications
You must be signed in to change notification settings - Fork 246
fix: improve otel span map cleanup performance #1200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5bd7a99
d3c9cf4
d4f8f67
affa2ab
0e5fed5
981cbcc
7a9f24c
373e58c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package util | ||
|
|
||
| import "sync" | ||
|
|
||
| type SyncMap[K comparable, V any] struct { | ||
| m sync.Map | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) Store(key K, value V) { | ||
| s.m.Store(key, value) | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) CompareAndDelete(key K, value V) { | ||
| s.m.CompareAndDelete(key, value) | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) Load(key K) (V, bool) { | ||
| v, ok := s.m.Load(key) | ||
| if !ok { | ||
| var zero V | ||
| return zero, false | ||
| } | ||
| return v.(V), true | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) Delete(key K) { | ||
| s.m.Delete(key) | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) LoadOrStore(key K, value V) (V, bool) { | ||
| actual, loaded := s.m.LoadOrStore(key, value) | ||
| return actual.(V), loaded | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) Clear() { | ||
| s.m.Clear() | ||
| } | ||
|
|
||
| func (s *SyncMap[K, V]) Range(f func(key K, value V) bool) { | ||
| s.m.Range(func(key, value any) bool { | ||
| return f(key.(K), value.(V)) | ||
| }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,123 +1,105 @@ | ||
| package sentryotel | ||
|
|
||
| import ( | ||
| "sync" | ||
| "sync/atomic" | ||
|
|
||
| "github.com/getsentry/sentry-go" | ||
| "github.com/getsentry/sentry-go/internal/util" | ||
| otelTrace "go.opentelemetry.io/otel/trace" | ||
| ) | ||
|
|
||
| type spanInfo struct { | ||
| span *sentry.Span | ||
| finished bool | ||
| children map[otelTrace.SpanID]struct{} | ||
| parentID otelTrace.SpanID | ||
| // TransactionEntry holds a reference to the root transaction span and | ||
| // tracks the number of active spans belonging to this trace. | ||
| type TransactionEntry struct { | ||
| root *sentry.Span | ||
| activeCount atomic.Int64 | ||
| // spans holds active (not yet finished) spans for Get lookups. | ||
| spans util.SyncMap[otelTrace.SpanID, *sentry.Span] | ||
| // knownSpanIDs tracks all span IDs ever added to this transaction. | ||
| knownSpanIDs util.SyncMap[otelTrace.SpanID, struct{}] | ||
| } | ||
|
|
||
| // HasSpan returns true if the given spanID was ever part of this transaction. | ||
| func (te *TransactionEntry) HasSpan(spanID otelTrace.SpanID) bool { | ||
| _, ok := te.knownSpanIDs.Load(spanID) | ||
| return ok | ||
| } | ||
|
|
||
| // SentrySpanMap is a mapping between OpenTelemetry spans and Sentry spans. | ||
| // It helps Sentry span processor and propagator to keep track of unfinished | ||
| // Sentry spans and to establish parent-child links between spans. | ||
| // It stores spans per transaction for lookup by the propagator and event processor, | ||
| // and manages transaction entries for creating child spans via the shared spanRecorder. | ||
| type SentrySpanMap struct { | ||
| spanMap map[otelTrace.SpanID]*spanInfo | ||
| mu sync.RWMutex | ||
| transactions util.SyncMap[otelTrace.TraceID, *TransactionEntry] | ||
| } | ||
|
|
||
| func (ssm *SentrySpanMap) Get(otelSpandID otelTrace.SpanID) (*sentry.Span, bool) { | ||
| ssm.mu.RLock() | ||
| defer ssm.mu.RUnlock() | ||
| info, ok := ssm.spanMap[otelSpandID] | ||
| // Get returns the current sentry.Span associated with the given OTel traceID and spanID. | ||
| func (ssm *SentrySpanMap) Get(traceID otelTrace.TraceID, spanID otelTrace.SpanID) (*sentry.Span, bool) { | ||
| entry, ok := ssm.transactions.Load(traceID) | ||
| if !ok { | ||
| return nil, false | ||
| } | ||
| return info.span, true | ||
| return entry.spans.Load(spanID) | ||
| } | ||
|
|
||
| func (ssm *SentrySpanMap) Set(otelSpandID otelTrace.SpanID, sentrySpan *sentry.Span, parentID otelTrace.SpanID) { | ||
| ssm.mu.Lock() | ||
| defer ssm.mu.Unlock() | ||
|
|
||
| info := &spanInfo{ | ||
| span: sentrySpan, | ||
| finished: false, | ||
| children: make(map[otelTrace.SpanID]struct{}), | ||
| parentID: parentID, | ||
| } | ||
| ssm.spanMap[otelSpandID] = info | ||
|
|
||
| if parentID != (otelTrace.SpanID{}) { | ||
| if parentInfo, ok := ssm.spanMap[parentID]; ok { | ||
| parentInfo.children[otelSpandID] = struct{}{} | ||
| } | ||
| } | ||
| // GetTransaction returns the transaction information for the given OTel traceID. | ||
| func (ssm *SentrySpanMap) GetTransaction(traceID otelTrace.TraceID) (*TransactionEntry, bool) { | ||
| return ssm.transactions.Load(traceID) | ||
| } | ||
|
|
||
| func (ssm *SentrySpanMap) MarkFinished(otelSpandID otelTrace.SpanID) { | ||
| ssm.mu.Lock() | ||
| defer ssm.mu.Unlock() | ||
|
|
||
| info, ok := ssm.spanMap[otelSpandID] | ||
| if !ok { | ||
| return | ||
| // Set stores the span and transaction information on the map. It handles both root and child spans automatically. | ||
| // | ||
| // If there is a cache miss on the given traceID, a transaction entry is created. Subsequent calls for the same traceID | ||
| // just increment the active span count and store the span in the entry. | ||
| func (ssm *SentrySpanMap) Set(spanID otelTrace.SpanID, span *sentry.Span, traceID otelTrace.TraceID) { | ||
| t := &TransactionEntry{root: span} | ||
| t.activeCount.Store(1) | ||
| t.spans.Store(spanID, span) | ||
| t.knownSpanIDs.Store(spanID, struct{}{}) | ||
|
|
||
| if existing, loaded := ssm.transactions.LoadOrStore(traceID, t); loaded { | ||
| existing.activeCount.Add(1) | ||
| existing.spans.Store(spanID, span) | ||
| existing.knownSpanIDs.Store(spanID, struct{}{}) | ||
sentry[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
Comment on lines
+53
to
63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: A race condition between Suggested FixThe logic for adding a span in Prompt for AI Agent |
||
|
|
||
| info.finished = true | ||
| ssm.tryCleanupSpan(otelSpandID) | ||
| } | ||
|
|
||
| // tryCleanupSpan deletes a parent and all children only if the whole subtree is marked finished. | ||
| // Must be called with lock held. | ||
| func (ssm *SentrySpanMap) tryCleanupSpan(spanID otelTrace.SpanID) { | ||
| info, ok := ssm.spanMap[spanID] | ||
| if !ok || !info.finished { | ||
| // MarkFinished removes a span from the active set and decrements the transaction's active count. | ||
| // When the count reaches zero, the transaction entry is removed. | ||
| // The span ID is kept in knownSpanIDs so that HasSpan continues to work for child span creation. | ||
| func (ssm *SentrySpanMap) MarkFinished(spanID otelTrace.SpanID, traceID otelTrace.TraceID) { | ||
| entry, ok := ssm.transactions.Load(traceID) | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| if !info.span.IsTransaction() { | ||
| parentID := info.parentID | ||
| if parentID != (otelTrace.SpanID{}) { | ||
| if parentInfo, parentExists := ssm.spanMap[parentID]; parentExists && !parentInfo.finished { | ||
| return | ||
| } | ||
| entry.spans.Delete(spanID) | ||
|
|
||
| if entry.activeCount.Add(-1) <= 0 { | ||
| // CompareAndSwap(CAS) is used to prevent a race between Set and MarkFinished. | ||
| // The race has two windows: | ||
| // 1. MarkFinished decremented activeCount to 0 but hasn't CAS'd yet -> Set Adds(1), and CAS fails keeping the | ||
| // transaction, since we just added a new span. | ||
| // 2. MarkFinished already CAS'd -> Set will store on the transaction marked for deletion (better than | ||
| // creating a new orphaned span). | ||
| if entry.activeCount.CompareAndSwap(0, -1) { | ||
| ssm.transactions.CompareAndDelete(traceID, entry) | ||
| } | ||
|
Comment on lines
+77
to
86
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sl0thentr0py @Litarnus would appreciate if you can verify my logic here. Added the fix due to this comment and I think it makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks good to me, your comments describe it accurately imo |
||
| } | ||
|
|
||
| // We need to have a lookup first to see if every child is marked as finished to actually cleanup everything. | ||
| // There probably is a better way to do this | ||
| for childID := range info.children { | ||
| if childInfo, exists := ssm.spanMap[childID]; exists && !childInfo.finished { | ||
| return | ||
| } | ||
| } | ||
|
|
||
| parentID := info.parentID | ||
| if parentID != (otelTrace.SpanID{}) { | ||
| if parentInfo, ok := ssm.spanMap[parentID]; ok { | ||
| delete(parentInfo.children, spanID) | ||
| } | ||
| } | ||
|
|
||
| for childID := range info.children { | ||
| if childInfo, exists := ssm.spanMap[childID]; exists && childInfo.finished { | ||
| ssm.tryCleanupSpan(childID) | ||
| } | ||
| } | ||
|
|
||
| delete(ssm.spanMap, spanID) | ||
| if parentID != (otelTrace.SpanID{}) { | ||
| ssm.tryCleanupSpan(parentID) | ||
| } | ||
| } | ||
|
|
||
| // Clear removes all spans stored on the map. | ||
| func (ssm *SentrySpanMap) Clear() { | ||
| ssm.mu.Lock() | ||
| defer ssm.mu.Unlock() | ||
| ssm.spanMap = make(map[otelTrace.SpanID]*spanInfo) | ||
| ssm.transactions.Clear() | ||
| } | ||
|
|
||
| // Len returns the number of spans on the map. | ||
| func (ssm *SentrySpanMap) Len() int { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should add a comment that this is very expensive and used for testing to discourage anyone from using it by accident |
||
| ssm.mu.RLock() | ||
| defer ssm.mu.RUnlock() | ||
| return len(ssm.spanMap) | ||
| count := 0 | ||
| ssm.transactions.Range(func(_ otelTrace.TraceID, entry *TransactionEntry) bool { | ||
| count += int(entry.activeCount.Load()) | ||
| return true | ||
| }) | ||
| return count | ||
| } | ||
|
|
||
| var sentrySpanMap = SentrySpanMap{spanMap: make(map[otelTrace.SpanID]*spanInfo)} | ||
| var sentrySpanMap = SentrySpanMap{} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| package sentryotel | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "testing" | ||
|
|
||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/sdk/resource" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| "go.opentelemetry.io/otel/trace" | ||
| ) | ||
|
|
||
| type noopExporter struct{} | ||
|
|
||
| func (e *noopExporter) ExportSpans(_ context.Context, _ []sdktrace.ReadOnlySpan) error { | ||
| return nil | ||
| } | ||
| func (e *noopExporter) Shutdown(_ context.Context) error { return nil } | ||
|
|
||
| func setupTracerProvider(useSentry bool) (*sdktrace.TracerProvider, func()) { | ||
| res, _ := resource.New(context.Background()) | ||
|
|
||
| tp := sdktrace.NewTracerProvider( | ||
| sdktrace.WithResource(res), | ||
| sdktrace.WithSampler(sdktrace.AlwaysSample()), | ||
| ) | ||
|
|
||
| if useSentry { | ||
| sentryProcessor := NewSentrySpanProcessor() | ||
| tp.RegisterSpanProcessor(sentryProcessor) | ||
| } | ||
|
|
||
| tp.RegisterSpanProcessor(sdktrace.NewBatchSpanProcessor(&noopExporter{})) | ||
|
|
||
| otel.SetTracerProvider(tp) | ||
| return tp, func() { _ = tp.Shutdown(context.Background()) } | ||
| } | ||
|
|
||
| func simulateWorkflowBatch(ctx context.Context, tracer trace.Tracer, numInstances, dbSpansPerInstance int) { | ||
| ctx, rootSpan := tracer.Start(ctx, "job.workflow_runner") | ||
|
|
||
| var wg sync.WaitGroup | ||
| for i := 0; i < numInstances; i++ { | ||
| wg.Add(1) | ||
| go func(idx int) { | ||
| defer wg.Done() | ||
| wfCtx, wfSpan := tracer.Start(ctx, fmt.Sprintf("workflow.debt_reminder_%d", idx)) | ||
|
|
||
| for j := 0; j < dbSpansPerInstance; j++ { | ||
| _, dbSpan := tracer.Start(wfCtx, fmt.Sprintf("postgres.query_%d", j)) | ||
| dbSpan.End() | ||
| } | ||
|
|
||
| wfSpan.End() | ||
| }(i) | ||
| } | ||
| wg.Wait() | ||
| rootSpan.End() | ||
| } | ||
|
|
||
| // BenchmarkSpanMapContention measures how much the Sentry span processor slows down unrelated handler | ||
| // spans when a large workflow transaction is being created and cleaned up concurrently. | ||
| func BenchmarkSpanMapContention(b *testing.B) { | ||
| _, cleanup := setupTracerProvider(true) | ||
| defer cleanup() | ||
| tracer := otel.Tracer("bench") | ||
|
|
||
| b.ResetTimer() | ||
| b.RunParallel(func(pb *testing.PB) { | ||
| iter := 0 | ||
| for pb.Next() { | ||
| if iter%500 == 0 { | ||
| // Every 500th iteration, simulate a large workflow batch. | ||
| // This creates 100×33 = 3300 child spans under a single root, | ||
| // then ends them all — exercising the cleanup path under contention. | ||
| simulateWorkflowBatch(context.Background(), tracer, 100, 33) | ||
| } else { | ||
| // This is the hot path that gets blocked from span cleanup. | ||
| ctx, span := tracer.Start(context.Background(), "GET /api/ping") | ||
| _, child := tracer.Start(ctx, "db.query") | ||
| child.End() | ||
| span.End() | ||
| } | ||
| iter++ | ||
| } | ||
| }) | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.