From bb335f3c4e829d4842e2bd7698b82bf9c5579275 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 4 Aug 2025 09:29:56 +0100 Subject: [PATCH 1/7] adding support for getting the TraceParent context for trace propagation, based on: https://github.com/microsoft/durabletask-go/pull/55 Signed-off-by: salaboy --- backend/executor.go | 1 + client/worker_grpc.go | 10 +++- .../distributedtracing/distributedtracing.go | 12 ++++ tests/grpc/grpc_test.go | 55 ++++++++++++++++++- 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/backend/executor.go b/backend/executor.go index 18dfa515..2932ded5 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -177,6 +177,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)}, TaskId: e.EventId, TaskExecutionId: task.TaskExecutionId, + ParentTraceContext: task.ParentTraceContext, } workItem := &protos.WorkItem{ Request: &protos.WorkItem_ActivityRequest{ diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 3b99f329..39446abf 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "github.com/dapr/durabletask-go/api" + "github.com/dapr/durabletask-go/api/helpers" "github.com/dapr/durabletask-go/api/protos" "github.com/dapr/durabletask-go/backend" "github.com/dapr/durabletask-go/task" @@ -174,7 +175,12 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( executor backend.Executor, req *protos.ActivityRequest, ) { - var tc *protos.TraceContext = nil // TODO: How to populate trace context? + var ptc *protos.TraceContext = req.ParentTraceContext + ctx, err := helpers.ContextFromTraceContext(ctx, ptc) + if err != nil { + fmt.Printf("%v: failed to parse trace context: %v", req.Name, err) + } + event := &protos.HistoryEvent{ EventId: req.TaskId, Timestamp: timestamppb.New(time.Now()), @@ -184,7 +190,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( Version: req.Version, Input: req.Input, TaskExecutionId: req.TaskExecutionId, - ParentTraceContext: tc, + ParentTraceContext: ptc, }, }, } diff --git a/samples/distributedtracing/distributedtracing.go b/samples/distributedtracing/distributedtracing.go index 04873286..1701dc43 100644 --- a/samples/distributedtracing/distributedtracing.go +++ b/samples/distributedtracing/distributedtracing.go @@ -18,6 +18,8 @@ import ( "github.com/dapr/durabletask-go/task" ) +var tracer = otel.Tracer("distributedtracing-example") + func main() { // Tracing can be configured independently of the orchestration code. tp, err := ConfigureZipkinTracing() @@ -136,6 +138,16 @@ func DoWorkActivity(ctx task.ActivityContext) (any, error) { return "", err } + _, childSpan := tracer.Start(ctx.Context(), "activity-subwork") + // Simulate doing some sub work + select { + case <-time.After(2 * time.Second): + // Ok + case <-ctx.Context().Done(): + return nil, ctx.Context().Err() + } + childSpan.End() + // Simulate doing work select { case <-time.After(duration): diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index fb8b1fd5..033a4163 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -21,11 +21,14 @@ import ( "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/client" "github.com/dapr/durabletask-go/task" + "github.com/dapr/durabletask-go/tests/utils" + "go.opentelemetry.io/otel" ) var ( grpcClient *client.TaskHubGrpcClient ctx = context.Background() + tracer = otel.Tracer("grpc-test") ) // TestMain is the entry point for the test suite. We use this to set up a gRPC server and client instance @@ -418,9 +421,9 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) { defer cancelListener() instanceID := api.InstanceID("THROW_IF_RUNNING_OR_COMPLETED") - id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) + _, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) - id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id)) + _, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id)) if assert.Error(t, err) { assert.Contains(t, err.Error(), "orchestration instance already exists") } @@ -489,3 +492,51 @@ func Test_Grpc_SubOrchestratorRetries(t *testing.T) { // With 3 max attempts there will be two retries with 10 millis delay before each require.GreaterOrEqual(t, metadata.LastUpdatedAt.AsTime(), metadata.CreatedAt.AsTime().Add(2*10*time.Millisecond)) } + +func Test_SingleActivity_TaskSpan(t *testing.T) { + // Registration + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity_TestSpan", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + _, childSpan := tracer.Start(ctx.Context(), "activityChild_TestSpan") + childSpan.End() + return fmt.Sprintf("Hello, %s!", name), nil + }) + + exporter := utils.InitTracing() + cancelListener := startGrpcListener(t, r) + defer cancelListener() + + // Run the orchestration + id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity_TestSpan", api.WithInput("世界")) + if assert.NoError(t, err) { + metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id) + if assert.NoError(t, err) { + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) + } + } + + // Validate the exported OTel traces + spans := exporter.GetSpans().Snapshots() + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity_TestSpan", id), + utils.AssertSpan("activityChild_TestSpan"), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity_TestSpan", id, "COMPLETED"), + ) + // assert child-parent relationship + assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID()) +} From 8c1823caf7ebb2de42f7505f9efeef5807f15ddc Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 4 Aug 2025 09:32:57 +0100 Subject: [PATCH 2/7] removing old reference Signed-off-by: salaboy --- tests/grpc/grpc_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 033a4163..97b6eb47 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -21,7 +21,6 @@ import ( "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/client" "github.com/dapr/durabletask-go/task" - "github.com/dapr/durabletask-go/tests/utils" "go.opentelemetry.io/otel" ) From 569e5957c210eb1a10f0f3fec2a3174668332337 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 4 Aug 2025 09:42:51 +0100 Subject: [PATCH 3/7] adding tracing utils Signed-off-by: salaboy --- tests/grpc/grpc_test.go | 7 +- tests/utils/tracing.go | 232 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+), 3 deletions(-) create mode 100644 tests/utils/tracing.go diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 97b6eb47..0331afb9 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -21,6 +21,7 @@ import ( "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/client" "github.com/dapr/durabletask-go/task" + "github.com/dapr/durabletask-go/tests/utils" "go.opentelemetry.io/otel" ) @@ -420,9 +421,9 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) { defer cancelListener() instanceID := api.InstanceID("THROW_IF_RUNNING_OR_COMPLETED") - _, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) + id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) require.NoError(t, err) - _, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id)) + id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id)) if assert.Error(t, err) { assert.Contains(t, err.Error(), "orchestration instance already exists") } @@ -524,7 +525,7 @@ func Test_SingleActivity_TaskSpan(t *testing.T) { metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id) if assert.NoError(t, err) { assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) - assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) + assert.Equal(t, `"Hello, 世界!"`, metadata.Output.Value) } } diff --git a/tests/utils/tracing.go b/tests/utils/tracing.go new file mode 100644 index 00000000..4f825ccc --- /dev/null +++ b/tests/utils/tracing.go @@ -0,0 +1,232 @@ +package utils + +import ( + "fmt" + "sync" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/dapr/durabletask-go/api" +) + +type ( + spanValidator func(t assert.TestingT, spans []trace.ReadOnlySpan, index int) + spanAttributeValidator func(t assert.TestingT, span trace.ReadOnlySpan) bool + spanEventValidator func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool +) + +var ( + initTracingOnce sync.Once + sharedTraceExporter = tracetest.NewInMemoryExporter() +) + +func AssertSpanSequence(t assert.TestingT, spans []trace.ReadOnlySpan, spanAsserts ...spanValidator) { + for i, f := range spanAsserts { + f(t, spans, i) + } +} + +// assertOrchestratorCreated validates a create_orchestration span +func AssertOrchestratorCreated(name string, id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { + spanName := fmt.Sprintf("create_orchestration||%s", name) + opts := []spanAttributeValidator{ + assertTaskType("orchestration"), + assertTaskName(name), + assertInstanceID(id), + } + opts = append(opts, optionalAsserts...) + return AssertSpan(spanName, opts...) +} + +// assertOrchestratorCreated validates an orchestration span +func AssertOrchestratorExecuted(name string, id api.InstanceID, status string, optionalAsserts ...spanAttributeValidator) spanValidator { + spanName := fmt.Sprintf("orchestration||%s", name) + opts := []spanAttributeValidator{ + assertTaskType("orchestration"), + assertTaskName(name), + assertInstanceID(id), + assertStatus(status), + } + opts = append(opts, optionalAsserts...) + return AssertSpan(spanName, opts...) +} + +func AssertActivity(name string, id api.InstanceID, taskID int64, optionalAsserts ...spanAttributeValidator) spanValidator { + spanName := fmt.Sprintf("activity||%s", name) + opts := []spanAttributeValidator{ + assertTaskType("activity"), + assertTaskName(name), + assertInstanceID(id), + assertTaskID(taskID), + } + opts = append(opts, optionalAsserts...) + return AssertSpan(spanName, opts...) +} + +func AssertTimer(id api.InstanceID) spanValidator { + return AssertSpan("timer", assertInstanceID(id), assertTimerFired()) +} + +func AssertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + if assert.Equal(t, len(eventAsserts), len(span.Events()), "unexpected number of span events") { + for i, f := range eventAsserts { + if !f(t, span, i) { + return false + } + } + } + return true + } +} + +func AssertExternalEvent(eventName string, payloadSize int) spanEventValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { + event := span.Events()[eventIndex] + hasMessage := assert.Equal(t, "Received external event", event.Name) + hasNameAttribute := assert.Contains(t, event.Attributes, attribute.KeyValue{ + Key: "name", + Value: attribute.StringValue(eventName), + }) + hasSizeAttribute := assert.Contains(t, event.Attributes, attribute.KeyValue{ + Key: "size", + Value: attribute.IntValue(payloadSize), + }) + return hasMessage && hasNameAttribute && hasSizeAttribute + } +} + +func AssertSuspendedEvent() spanEventValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { + event := span.Events()[eventIndex] + return assert.Equal(t, "Execution suspended", event.Name) + } +} + +func AssertResumedEvent() spanEventValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { + event := span.Events()[eventIndex] + return assert.Equal(t, "Execution resumed", event.Name) + } +} + +func AssertSpan(name string, optionalAsserts ...spanAttributeValidator) spanValidator { + return func(t assert.TestingT, spans []trace.ReadOnlySpan, index int) { + if !doAssertSpan(t, spans, index, name, optionalAsserts...) { + fmt.Printf("span assertion for %s (index=%d) failed\n", name, index) + } + } +} + +func doAssertSpan(t assert.TestingT, spans []trace.ReadOnlySpan, index int, name string, optionalAsserts ...spanAttributeValidator) bool { + // array bounds check + if !assert.Lessf(t, index, len(spans), "%d spans were exported, but more were expected by the test", len(spans)) { + return false + } + + span := spans[index] + + // All spans have a name that we must validate + success := assert.Equal(t, name, span.Name()) + + // Optional validations that are span-specific + for _, optionalAssert := range optionalAsserts { + if !optionalAssert(t, span) { + success = false + } + } + + return success +} + +func assertTaskType(expectedTaskType string) spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + return assert.Contains(t, span.Attributes(), attribute.KeyValue{ + Key: "durabletask.type", + Value: attribute.StringValue(expectedTaskType), + }) + } +} + +func assertTaskName(expectedTaskName string) spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + return assert.Contains(t, span.Attributes(), attribute.KeyValue{ + Key: "durabletask.task.name", + Value: attribute.StringValue(expectedTaskName), + }) + } +} + +func assertTaskID(expectedTaskID int64) spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + return assert.Contains(t, span.Attributes(), attribute.KeyValue{ + Key: "durabletask.task.task_id", + Value: attribute.Int64Value(expectedTaskID), + }) + } +} + +func assertInstanceID(expectedID api.InstanceID) spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + return assert.Contains(t, span.Attributes(), attribute.KeyValue{ + Key: "durabletask.task.instance_id", + Value: attribute.StringValue(string(expectedID)), + }) + } +} + +func assertStatus(expectedStatus string) spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + return assert.Contains(t, span.Attributes(), attribute.KeyValue{ + Key: "durabletask.runtime_status", + Value: attribute.StringValue(expectedStatus), + }) + } +} + +func assertTimerFired() spanAttributeValidator { + return func(t assert.TestingT, span trace.ReadOnlySpan) bool { + var firedAtStr string + for _, a := range span.Attributes() { + if a.Key == "durabletask.fire_at" { + firedAtStr = a.Value.AsString() + break + } + } + + if assert.NotEmptyf(t, firedAtStr, "couldn't find the durabletask.fire_at attribute") { + // Ensure we can parse the value and that the value fits into a general range. + // Note that we're not attempting to validate a specific time. + firedAt, err := time.Parse(time.RFC3339, firedAtStr) + now := time.Now().UTC() + return assert.NoError(t, err) && + assert.Less(t, firedAt, now) && + assert.Greater(t, firedAt, now.Add(-1*time.Hour)) + } + + return false + } +} + +// initTracing configures in-memory OTel tracing and returns an exporter which can be used +// to examine the exported traces. We only want to look at exported traces because we do +// tricks to mark certain spans as non-exported (i.e. orchestration replays), and want +// to ensure that those spans are never actually exported. +func InitTracing() *tracetest.InMemoryExporter { + // The global tracer provider can only be initialized once. + // Subsequent initializations will silently fail. + initTracingOnce.Do(func() { + processor := trace.NewSimpleSpanProcessor(sharedTraceExporter) + provider := trace.NewTracerProvider(trace.WithSpanProcessor(processor)) + otel.SetTracerProvider(provider) + }) + + // Reset the shared exporter so that new tests don't see traces from previous tests. + sharedTraceExporter.Reset() + return sharedTraceExporter +} From b741cc79d0e840f1a24663d245627cbafb52747b Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 4 Aug 2025 10:27:54 +0100 Subject: [PATCH 4/7] updating tracing utils to be used in orchestrations tests Signed-off-by: salaboy --- tests/orchestrations_test.go | 261 ++++++++++++++++++----------------- tests/tracing_test.go | 237 ------------------------------- tests/utils/tracing.go | 13 +- 3 files changed, 140 insertions(+), 371 deletions(-) delete mode 100644 tests/tracing_test.go diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index 3f3eb87c..26185a66 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -19,6 +19,7 @@ import ( "github.com/dapr/durabletask-go/backend" "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/task" + "github.com/dapr/durabletask-go/tests/utils" ) func Test_EmptyOrchestration(t *testing.T) { @@ -30,7 +31,7 @@ func Test_EmptyOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -43,9 +44,9 @@ func Test_EmptyOrchestration(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("EmptyOrchestrator", id), - assertOrchestratorExecuted("EmptyOrchestrator", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("EmptyOrchestrator", id), + utils.AssertOrchestratorExecuted("EmptyOrchestrator", id, "COMPLETED"), ) } @@ -59,7 +60,7 @@ func Test_SingleTimer(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -75,10 +76,10 @@ func Test_SingleTimer(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SingleTimer", id), - assertTimer(id), - assertOrchestratorExecuted("SingleTimer", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleTimer", id), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("SingleTimer", id, "COMPLETED"), ) } @@ -101,7 +102,7 @@ func Test_ConcurrentTimers(t *testing.T) { // Initialization ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -117,12 +118,12 @@ func Test_ConcurrentTimers(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("TimerFanOut", id), - assertTimer(id), - assertTimer(id), - assertTimer(id), - assertOrchestratorExecuted("TimerFanOut", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("TimerFanOut", id), + utils.AssertTimer(id), + utils.AssertTimer(id), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("TimerFanOut", id, "COMPLETED"), ) } @@ -140,7 +141,7 @@ func Test_IsReplaying(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -156,11 +157,11 @@ func Test_IsReplaying(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("IsReplayingOrch", id), - assertTimer(id), - assertTimer(id), - assertOrchestratorExecuted("IsReplayingOrch", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("IsReplayingOrch", id), + utils.AssertTimer(id), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("IsReplayingOrch", id, "COMPLETED"), ) } @@ -186,7 +187,7 @@ func Test_SingleActivity(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -202,10 +203,10 @@ func Test_SingleActivity(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SingleActivity", id), - assertActivity("SayHello", id, 0), - assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), ) } @@ -231,7 +232,7 @@ func Test_ActivityChain(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -247,12 +248,12 @@ func Test_ActivityChain(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ActivityChain", id), - assertActivity("PlusOne", id, 0), assertActivity("PlusOne", id, 1), assertActivity("PlusOne", id, 2), - assertActivity("PlusOne", id, 3), assertActivity("PlusOne", id, 4), assertActivity("PlusOne", id, 5), - assertActivity("PlusOne", id, 6), assertActivity("PlusOne", id, 7), assertActivity("PlusOne", id, 8), - assertActivity("PlusOne", id, 9), assertOrchestratorExecuted("ActivityChain", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ActivityChain", id), + utils.AssertActivity("PlusOne", id, 0), utils.AssertActivity("PlusOne", id, 1), utils.AssertActivity("PlusOne", id, 2), + utils.AssertActivity("PlusOne", id, 3), utils.AssertActivity("PlusOne", id, 4), utils.AssertActivity("PlusOne", id, 5), + utils.AssertActivity("PlusOne", id, 6), utils.AssertActivity("PlusOne", id, 7), utils.AssertActivity("PlusOne", id, 8), + utils.AssertActivity("PlusOne", id, 9), utils.AssertOrchestratorExecuted("ActivityChain", id, "COMPLETED"), ) } @@ -274,7 +275,7 @@ func Test_ActivityRetries(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -291,14 +292,14 @@ func Test_ActivityRetries(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ActivityRetries", id), - assertActivity("FailActivity", id, 0), - assertTimer(id, assertTaskID(1)), - assertActivity("FailActivity", id, 2), - assertTimer(id, assertTaskID(3)), - assertActivity("FailActivity", id, 4), - assertOrchestratorExecuted("ActivityRetries", id, "FAILED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ActivityRetries", id), + utils.AssertActivity("FailActivity", id, 0), + utils.AssertTimer(id, utils.AssertTaskID(1)), + utils.AssertActivity("FailActivity", id, 2), + utils.AssertTimer(id, utils.AssertTaskID(3)), + utils.AssertActivity("FailActivity", id, 4), + utils.AssertOrchestratorExecuted("ActivityRetries", id, "FAILED"), ) } @@ -332,7 +333,7 @@ func Test_ActivityFanOut(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r, backend.WithMaxParallelism(10)) defer worker.Shutdown(ctx) @@ -351,8 +352,8 @@ func Test_ActivityFanOut(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ActivityFanOut", id), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ActivityFanOut", id), // TODO: Find a way to assert an unordered sequence of traces since the order of activity traces is non-deterministic. ) } @@ -380,7 +381,7 @@ func Test_SingleSubOrchestrator_Completed(t *testing.T) { }) ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -392,10 +393,10 @@ func Test_SingleSubOrchestrator_Completed(t *testing.T) { assert.Equal(t, `"Hello, world!"`, metadata.Output.Value) spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("Parent", id), - assertOrchestratorExecuted("Child", id+"_child", "COMPLETED"), - assertOrchestratorExecuted("Parent", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("Parent", id), + utils.AssertOrchestratorExecuted("Child", id+"_child", "COMPLETED"), + utils.AssertOrchestratorExecuted("Parent", id, "COMPLETED"), ) } @@ -412,7 +413,7 @@ func Test_SingleSubOrchestrator_Failed(t *testing.T) { }) ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -426,10 +427,10 @@ func Test_SingleSubOrchestrator_Failed(t *testing.T) { } spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("Parent", id), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertOrchestratorExecuted("Parent", id, "FAILED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("Parent", id), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertOrchestratorExecuted("Parent", id, "FAILED"), ) } @@ -451,7 +452,7 @@ func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { }) ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -465,14 +466,14 @@ func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { } spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("Parent", id), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertTimer(id, assertTaskID(1)), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertTimer(id, assertTaskID(3)), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertOrchestratorExecuted("Parent", id, "FAILED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("Parent", id), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertTimer(id, utils.AssertTaskID(1)), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertTimer(id, utils.AssertTaskID(3)), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertOrchestratorExecuted("Parent", id, "FAILED"), ) } @@ -496,7 +497,7 @@ func Test_ContinueAsNew(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -512,19 +513,19 @@ func Test_ContinueAsNew(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ContinueAsNewTest", id), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertOrchestratorExecuted("ContinueAsNewTest", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ContinueAsNewTest", id), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "COMPLETED"), ) } @@ -632,7 +633,7 @@ func Test_ExternalEventOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -656,19 +657,19 @@ func Test_ExternalEventOrchestration(t *testing.T) { // Validate the exported OTel traces eventSizeInBytes := 1 spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ExternalEventOrchestration", id), - assertOrchestratorExecuted("ExternalEventOrchestration", id, "COMPLETED", assertSpanEvents( - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ExternalEventOrchestration", id), + utils.AssertOrchestratorExecuted("ExternalEventOrchestration", id, "COMPLETED", utils.AssertSpanEvents( + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), )), ) } @@ -685,7 +686,7 @@ func Test_ExternalEventTimeout(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -711,10 +712,10 @@ func Test_ExternalEventTimeout(t *testing.T) { assert.True(t, api.OrchestrationMetadataIsComplete(metadata)) assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) - assertSpanSequence(t, spans, - assertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), - assertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "COMPLETED", assertSpanEvents( - assertExternalEvent("MyEvent", 0), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), + utils.AssertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "COMPLETED", utils.AssertSpanEvents( + utils.AssertExternalEvent("MyEvent", 0), )), ) } else { @@ -727,11 +728,11 @@ func Test_ExternalEventTimeout(t *testing.T) { assert.Equal(t, "the task was canceled", metadata.FailureDetails.ErrorMessage) } - assertSpanSequence(t, spans, - assertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), // A timer is used to implement the event timeout - assertTimer(id), - assertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "FAILED", assertSpanEvents()), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "FAILED", utils.AssertSpanEvents()), ) } }) @@ -757,7 +758,7 @@ func Test_SuspendResumeOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -801,21 +802,21 @@ func Test_SuspendResumeOrchestration(t *testing.T) { // Validate the exported OTel traces eventSizeInBytes := 1 spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SuspendResumeOrchestration", id), - assertOrchestratorExecuted("SuspendResumeOrchestration", id, "COMPLETED", assertSpanEvents( - assertSuspendedEvent(), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertResumedEvent(), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SuspendResumeOrchestration", id), + utils.AssertOrchestratorExecuted("SuspendResumeOrchestration", id, "COMPLETED", utils.AssertSpanEvents( + utils.AssertSuspendedEvent(), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertResumedEvent(), )), ) } @@ -830,7 +831,7 @@ func Test_TerminateOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -850,9 +851,9 @@ func Test_TerminateOrchestration(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("MyOrchestrator", id), - assertOrchestratorExecuted("MyOrchestrator", id, "TERMINATED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("MyOrchestrator", id), + utils.AssertOrchestratorExecuted("MyOrchestrator", id, "TERMINATED"), ) } @@ -1170,7 +1171,7 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -1194,13 +1195,13 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SingleActivity", id), - assertActivity("SayHello", id, 0), - assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), - assertOrchestratorCreated("SingleActivity", id), - assertActivity("SayHello", id, 0), - assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), ) } diff --git a/tests/tracing_test.go b/tests/tracing_test.go deleted file mode 100644 index ec598631..00000000 --- a/tests/tracing_test.go +++ /dev/null @@ -1,237 +0,0 @@ -package tests - -import ( - "fmt" - "sync" - "time" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - - "github.com/dapr/durabletask-go/api" -) - -type ( - spanValidator func(t assert.TestingT, spans []trace.ReadOnlySpan, index int) - spanAttributeValidator func(t assert.TestingT, span trace.ReadOnlySpan) bool - spanEventValidator func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool -) - -var ( - initTracingOnce sync.Once - sharedTraceExporter = tracetest.NewInMemoryExporter() -) - -func assertSpanSequence(t assert.TestingT, spans []trace.ReadOnlySpan, spanAsserts ...spanValidator) { - for i, f := range spanAsserts { - f(t, spans, i) - } -} - -// assertOrchestratorCreated validates a create_orchestration span -func assertOrchestratorCreated(name string, id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { - spanName := fmt.Sprintf("create_orchestration||%s", name) - opts := []spanAttributeValidator{ - assertTaskType("orchestration"), - assertTaskName(name), - assertInstanceID(id), - } - opts = append(opts, optionalAsserts...) - return assertSpan(spanName, opts...) -} - -// assertOrchestratorCreated validates an orchestration span -func assertOrchestratorExecuted(name string, id api.InstanceID, status string, optionalAsserts ...spanAttributeValidator) spanValidator { - spanName := fmt.Sprintf("orchestration||%s", name) - opts := []spanAttributeValidator{ - assertTaskType("orchestration"), - assertTaskName(name), - assertInstanceID(id), - assertStatus(status), - } - opts = append(opts, optionalAsserts...) - return assertSpan(spanName, opts...) -} - -func assertActivity(name string, id api.InstanceID, taskID int64, optionalAsserts ...spanAttributeValidator) spanValidator { - spanName := fmt.Sprintf("activity||%s", name) - opts := []spanAttributeValidator{ - assertTaskType("activity"), - assertTaskName(name), - assertInstanceID(id), - assertTaskID(taskID), - } - opts = append(opts, optionalAsserts...) - return assertSpan(spanName, opts...) -} - -func assertTimer(id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { - opts := []spanAttributeValidator{ - assertInstanceID(id), - assertTimerFired(), - } - opts = append(opts, optionalAsserts...) - return assertSpan("timer", opts...) -} - -func assertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - if assert.Equal(t, len(eventAsserts), len(span.Events()), "unexpected number of span events") { - for i, f := range eventAsserts { - if !f(t, span, i) { - return false - } - } - } - return true - } -} - -func assertExternalEvent(eventName string, payloadSize int) spanEventValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { - event := span.Events()[eventIndex] - hasMessage := assert.Equal(t, "Received external event", event.Name) - hasNameAttribute := assert.Contains(t, event.Attributes, attribute.KeyValue{ - Key: "name", - Value: attribute.StringValue(eventName), - }) - hasSizeAttribute := assert.Contains(t, event.Attributes, attribute.KeyValue{ - Key: "size", - Value: attribute.IntValue(payloadSize), - }) - return hasMessage && hasNameAttribute && hasSizeAttribute - } -} - -func assertSuspendedEvent() spanEventValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { - event := span.Events()[eventIndex] - return assert.Equal(t, "Execution suspended", event.Name) - } -} - -func assertResumedEvent() spanEventValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { - event := span.Events()[eventIndex] - return assert.Equal(t, "Execution resumed", event.Name) - } -} - -func assertSpan(name string, optionalAsserts ...spanAttributeValidator) spanValidator { - return func(t assert.TestingT, spans []trace.ReadOnlySpan, index int) { - if !doAssertSpan(t, spans, index, name, optionalAsserts...) { - fmt.Printf("span assertion for %s (index=%d) failed\n", name, index) - } - } -} - -func doAssertSpan(t assert.TestingT, spans []trace.ReadOnlySpan, index int, name string, optionalAsserts ...spanAttributeValidator) bool { - // array bounds check - if !assert.Lessf(t, index, len(spans), "%d spans were exported, but more were expected by the test", len(spans)) { - return false - } - - span := spans[index] - - // All spans have a name that we must validate - success := assert.Equal(t, name, span.Name()) - - // Optional validations that are span-specific - for _, optionalAssert := range optionalAsserts { - if !optionalAssert(t, span) { - success = false - } - } - - return success -} - -func assertTaskType(expectedTaskType string) spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - return assert.Contains(t, span.Attributes(), attribute.KeyValue{ - Key: "durabletask.type", - Value: attribute.StringValue(expectedTaskType), - }) - } -} - -func assertTaskName(expectedTaskName string) spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - return assert.Contains(t, span.Attributes(), attribute.KeyValue{ - Key: "durabletask.task.name", - Value: attribute.StringValue(expectedTaskName), - }) - } -} - -func assertTaskID(expectedTaskID int64) spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - return assert.Contains(t, span.Attributes(), attribute.KeyValue{ - Key: "durabletask.task.task_id", - Value: attribute.Int64Value(expectedTaskID), - }) - } -} - -func assertInstanceID(expectedID api.InstanceID) spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - return assert.Contains(t, span.Attributes(), attribute.KeyValue{ - Key: "durabletask.task.instance_id", - Value: attribute.StringValue(string(expectedID)), - }) - } -} - -func assertStatus(expectedStatus string) spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - return assert.Contains(t, span.Attributes(), attribute.KeyValue{ - Key: "durabletask.runtime_status", - Value: attribute.StringValue(expectedStatus), - }) - } -} - -func assertTimerFired() spanAttributeValidator { - return func(t assert.TestingT, span trace.ReadOnlySpan) bool { - var firedAtStr string - for _, a := range span.Attributes() { - if a.Key == "durabletask.fire_at" { - firedAtStr = a.Value.AsString() - break - } - } - - if assert.NotEmptyf(t, firedAtStr, "couldn't find the durabletask.fire_at attribute") { - // Ensure we can parse the value and that the value fits into a general range. - // Note that we're not attempting to validate a specific time. - firedAt, err := time.Parse(time.RFC3339, firedAtStr) - now := time.Now().UTC() - return assert.NoError(t, err) && - assert.Less(t, firedAt, now) && - assert.Greater(t, firedAt, now.Add(-1*time.Hour)) - } - - return false - } -} - -// initTracing configures in-memory OTel tracing and returns an exporter which can be used -// to examine the exported traces. We only want to look at exported traces because we do -// tricks to mark certain spans as non-exported (i.e. orchestration replays), and want -// to ensure that those spans are never actually exported. -func initTracing() *tracetest.InMemoryExporter { - // The global tracer provider can only be initialized once. - // Subsequent initializations will silently fail. - initTracingOnce.Do(func() { - processor := trace.NewSimpleSpanProcessor(sharedTraceExporter) - provider := trace.NewTracerProvider(trace.WithSpanProcessor(processor)) - otel.SetTracerProvider(provider) - }) - - // Reset the shared exporter so that new tests don't see traces from previous tests. - sharedTraceExporter.Reset() - return sharedTraceExporter -} diff --git a/tests/utils/tracing.go b/tests/utils/tracing.go index 4f825ccc..4a40085b 100644 --- a/tests/utils/tracing.go +++ b/tests/utils/tracing.go @@ -62,14 +62,19 @@ func AssertActivity(name string, id api.InstanceID, taskID int64, optionalAssert assertTaskType("activity"), assertTaskName(name), assertInstanceID(id), - assertTaskID(taskID), + AssertTaskID(taskID), } opts = append(opts, optionalAsserts...) return AssertSpan(spanName, opts...) } -func AssertTimer(id api.InstanceID) spanValidator { - return AssertSpan("timer", assertInstanceID(id), assertTimerFired()) +func AssertTimer(id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { + opts := []spanAttributeValidator{ + assertInstanceID(id), + assertTimerFired(), + } + opts = append(opts, optionalAsserts...) + return AssertSpan("timer", opts...) } func AssertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator { @@ -162,7 +167,7 @@ func assertTaskName(expectedTaskName string) spanAttributeValidator { } } -func assertTaskID(expectedTaskID int64) spanAttributeValidator { +func AssertTaskID(expectedTaskID int64) spanAttributeValidator { return func(t assert.TestingT, span trace.ReadOnlySpan) bool { return assert.Contains(t, span.Attributes(), attribute.KeyValue{ Key: "durabletask.task.task_id", From 267784bcff921dcf6dcaa03977571ebe31130c31 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 4 Aug 2025 13:24:39 +0100 Subject: [PATCH 5/7] adding orchestration test Signed-off-by: salaboy --- backend/activity.go | 5 ++-- tests/orchestrations_test.go | 54 ++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/backend/activity.go b/backend/activity.go index 970169c0..096c681b 100644 --- a/backend/activity.go +++ b/backend/activity.go @@ -49,7 +49,6 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo if ts == nil { return fmt.Errorf("%v: invalid TaskScheduled event", awi.InstanceID) } - // Create span as child of spanContext found in TaskScheduledEvent ctx, err := helpers.ContextFromTraceContext(ctx, ts.ParentTraceContext) if err != nil { @@ -66,6 +65,9 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo }() } + // set the parent trace context to be the newly created activity span + ts.ParentTraceContext = helpers.TraceContextFromSpan(span) + // Execute the activity and get its result result, err := p.executor.ExecuteActivity(ctx, awi.InstanceID, awi.NewEvent) if err != nil { @@ -75,7 +77,6 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo } return err } - awi.Result = result return nil } diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index 26185a66..e56bb522 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -20,8 +20,11 @@ import ( "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/task" "github.com/dapr/durabletask-go/tests/utils" + "go.opentelemetry.io/otel" ) +var tracer = otel.Tracer("orchestration-test") + func Test_EmptyOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() @@ -210,6 +213,57 @@ func Test_SingleActivity(t *testing.T) { ) } +func Test_SingleActivity_TaskSpan(t *testing.T) { + // Registration + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + _, childSpan := tracer.Start(ctx.Context(), "activityChild") + childSpan.End() + return fmt.Sprintf("Hello, %s!", name), nil + }) + + // Initialization + ctx := context.Background() + exporter := utils.InitTracing() + + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + // Run the orchestration + id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界")) + if assert.NoError(t, err) { + metadata, err := client.WaitForOrchestrationCompletion(ctx, id) + if assert.NoError(t, err) { + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + assert.Equal(t, `"Hello, 世界!"`, metadata.Output.Value) + } + } + + // Validate the exported OTel traces + spans := exporter.GetSpans().Snapshots() + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertSpan("activityChild"), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + ) + // assert child-parent relationship + assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID()) +} + func Test_ActivityChain(t *testing.T) { // Registration r := task.NewTaskRegistry() From 55150ae4f9abd196bdf75a59cab2de95b03a6751 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 11 Aug 2025 12:03:17 +0100 Subject: [PATCH 6/7] Update worker_grpc.go Signed-off-by: salaboy --- client/worker_grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 39446abf..9352f29e 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -178,7 +178,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( var ptc *protos.TraceContext = req.ParentTraceContext ctx, err := helpers.ContextFromTraceContext(ctx, ptc) if err != nil { - fmt.Printf("%v: failed to parse trace context: %v", req.Name, err) + c.logger.Warn("%v: failed to parse trace context: %v", req.Name, err) } event := &protos.HistoryEvent{ From 3e318163243e83a7d6aa0ae788090a61d8c17afe Mon Sep 17 00:00:00 2001 From: salaboy Date: Thu, 21 Aug 2025 10:37:46 +0100 Subject: [PATCH 7/7] Update grpc_test.go Signed-off-by: salaboy --- tests/grpc/grpc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 0331afb9..0e7e7bfd 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -520,7 +520,7 @@ func Test_SingleActivity_TaskSpan(t *testing.T) { defer cancelListener() // Run the orchestration - id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity_TestSpan", api.WithInput("世界")) + id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity_TestSpan", api.WithInput("世界"), api.WithStartTime(time.Now())) if assert.NoError(t, err) { metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id) if assert.NoError(t, err) {