diff --git a/backend/activity.go b/backend/activity.go index 970169c0..096c681b 100644 --- a/backend/activity.go +++ b/backend/activity.go @@ -49,7 +49,6 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo if ts == nil { return fmt.Errorf("%v: invalid TaskScheduled event", awi.InstanceID) } - // Create span as child of spanContext found in TaskScheduledEvent ctx, err := helpers.ContextFromTraceContext(ctx, ts.ParentTraceContext) if err != nil { @@ -66,6 +65,9 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo }() } + // set the parent trace context to be the newly created activity span + ts.ParentTraceContext = helpers.TraceContextFromSpan(span) + // Execute the activity and get its result result, err := p.executor.ExecuteActivity(ctx, awi.InstanceID, awi.NewEvent) if err != nil { @@ -75,7 +77,6 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo } return err } - awi.Result = result return nil } diff --git a/backend/executor.go b/backend/executor.go index 18dfa515..2932ded5 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -177,6 +177,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)}, TaskId: e.EventId, TaskExecutionId: task.TaskExecutionId, + ParentTraceContext: task.ParentTraceContext, } workItem := &protos.WorkItem{ Request: &protos.WorkItem_ActivityRequest{ diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 3b99f329..9352f29e 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "github.com/dapr/durabletask-go/api" + "github.com/dapr/durabletask-go/api/helpers" "github.com/dapr/durabletask-go/api/protos" "github.com/dapr/durabletask-go/backend" "github.com/dapr/durabletask-go/task" @@ -174,7 +175,12 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( executor backend.Executor, req *protos.ActivityRequest, ) { - var tc *protos.TraceContext = nil // TODO: How to populate trace context? + var ptc *protos.TraceContext = req.ParentTraceContext + ctx, err := helpers.ContextFromTraceContext(ctx, ptc) + if err != nil { + c.logger.Warn("%v: failed to parse trace context: %v", req.Name, err) + } + event := &protos.HistoryEvent{ EventId: req.TaskId, Timestamp: timestamppb.New(time.Now()), @@ -184,7 +190,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( Version: req.Version, Input: req.Input, TaskExecutionId: req.TaskExecutionId, - ParentTraceContext: tc, + ParentTraceContext: ptc, }, }, } diff --git a/samples/distributedtracing/distributedtracing.go b/samples/distributedtracing/distributedtracing.go index 04873286..1701dc43 100644 --- a/samples/distributedtracing/distributedtracing.go +++ b/samples/distributedtracing/distributedtracing.go @@ -18,6 +18,8 @@ import ( "github.com/dapr/durabletask-go/task" ) +var tracer = otel.Tracer("distributedtracing-example") + func main() { // Tracing can be configured independently of the orchestration code. tp, err := ConfigureZipkinTracing() @@ -136,6 +138,16 @@ func DoWorkActivity(ctx task.ActivityContext) (any, error) { return "", err } + _, childSpan := tracer.Start(ctx.Context(), "activity-subwork") + // Simulate doing some sub work + select { + case <-time.After(2 * time.Second): + // Ok + case <-ctx.Context().Done(): + return nil, ctx.Context().Err() + } + childSpan.End() + // Simulate doing work select { case <-time.After(duration): diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index fb8b1fd5..0e7e7bfd 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -21,11 +21,14 @@ import ( "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/client" "github.com/dapr/durabletask-go/task" + "github.com/dapr/durabletask-go/tests/utils" + "go.opentelemetry.io/otel" ) var ( grpcClient *client.TaskHubGrpcClient ctx = context.Background() + tracer = otel.Tracer("grpc-test") ) // TestMain is the entry point for the test suite. We use this to set up a gRPC server and client instance @@ -489,3 +492,51 @@ func Test_Grpc_SubOrchestratorRetries(t *testing.T) { // With 3 max attempts there will be two retries with 10 millis delay before each require.GreaterOrEqual(t, metadata.LastUpdatedAt.AsTime(), metadata.CreatedAt.AsTime().Add(2*10*time.Millisecond)) } + +func Test_SingleActivity_TaskSpan(t *testing.T) { + // Registration + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity_TestSpan", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + _, childSpan := tracer.Start(ctx.Context(), "activityChild_TestSpan") + childSpan.End() + return fmt.Sprintf("Hello, %s!", name), nil + }) + + exporter := utils.InitTracing() + cancelListener := startGrpcListener(t, r) + defer cancelListener() + + // Run the orchestration + id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity_TestSpan", api.WithInput("世界"), api.WithStartTime(time.Now())) + if assert.NoError(t, err) { + metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id) + if assert.NoError(t, err) { + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + assert.Equal(t, `"Hello, 世界!"`, metadata.Output.Value) + } + } + + // Validate the exported OTel traces + spans := exporter.GetSpans().Snapshots() + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity_TestSpan", id), + utils.AssertSpan("activityChild_TestSpan"), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity_TestSpan", id, "COMPLETED"), + ) + // assert child-parent relationship + assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID()) +} diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index 3f3eb87c..e56bb522 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -19,8 +19,12 @@ import ( "github.com/dapr/durabletask-go/backend" "github.com/dapr/durabletask-go/backend/sqlite" "github.com/dapr/durabletask-go/task" + "github.com/dapr/durabletask-go/tests/utils" + "go.opentelemetry.io/otel" ) +var tracer = otel.Tracer("orchestration-test") + func Test_EmptyOrchestration(t *testing.T) { // Registration r := task.NewTaskRegistry() @@ -30,7 +34,7 @@ func Test_EmptyOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -43,9 +47,9 @@ func Test_EmptyOrchestration(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("EmptyOrchestrator", id), - assertOrchestratorExecuted("EmptyOrchestrator", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("EmptyOrchestrator", id), + utils.AssertOrchestratorExecuted("EmptyOrchestrator", id, "COMPLETED"), ) } @@ -59,7 +63,7 @@ func Test_SingleTimer(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -75,10 +79,10 @@ func Test_SingleTimer(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SingleTimer", id), - assertTimer(id), - assertOrchestratorExecuted("SingleTimer", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleTimer", id), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("SingleTimer", id, "COMPLETED"), ) } @@ -101,7 +105,7 @@ func Test_ConcurrentTimers(t *testing.T) { // Initialization ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -117,12 +121,12 @@ func Test_ConcurrentTimers(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("TimerFanOut", id), - assertTimer(id), - assertTimer(id), - assertTimer(id), - assertOrchestratorExecuted("TimerFanOut", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("TimerFanOut", id), + utils.AssertTimer(id), + utils.AssertTimer(id), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("TimerFanOut", id, "COMPLETED"), ) } @@ -140,7 +144,7 @@ func Test_IsReplaying(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -156,11 +160,11 @@ func Test_IsReplaying(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("IsReplayingOrch", id), - assertTimer(id), - assertTimer(id), - assertOrchestratorExecuted("IsReplayingOrch", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("IsReplayingOrch", id), + utils.AssertTimer(id), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("IsReplayingOrch", id, "COMPLETED"), ) } @@ -186,7 +190,55 @@ func Test_SingleActivity(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + // Run the orchestration + id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界")) + if assert.NoError(t, err) { + metadata, err := client.WaitForOrchestrationCompletion(ctx, id) + if assert.NoError(t, err) { + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + assert.Equal(t, `"Hello, 世界!"`, metadata.Output.Value) + } + } + + // Validate the exported OTel traces + spans := exporter.GetSpans().Snapshots() + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + ) +} + +func Test_SingleActivity_TaskSpan(t *testing.T) { + // Registration + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + _, childSpan := tracer.Start(ctx.Context(), "activityChild") + childSpan.End() + return fmt.Sprintf("Hello, %s!", name), nil + }) + + // Initialization + ctx := context.Background() + exporter := utils.InitTracing() + client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -202,11 +254,14 @@ func Test_SingleActivity(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SingleActivity", id), - assertActivity("SayHello", id, 0), - assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertSpan("activityChild"), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), ) + // assert child-parent relationship + assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID()) } func Test_ActivityChain(t *testing.T) { @@ -231,7 +286,7 @@ func Test_ActivityChain(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -247,12 +302,12 @@ func Test_ActivityChain(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ActivityChain", id), - assertActivity("PlusOne", id, 0), assertActivity("PlusOne", id, 1), assertActivity("PlusOne", id, 2), - assertActivity("PlusOne", id, 3), assertActivity("PlusOne", id, 4), assertActivity("PlusOne", id, 5), - assertActivity("PlusOne", id, 6), assertActivity("PlusOne", id, 7), assertActivity("PlusOne", id, 8), - assertActivity("PlusOne", id, 9), assertOrchestratorExecuted("ActivityChain", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ActivityChain", id), + utils.AssertActivity("PlusOne", id, 0), utils.AssertActivity("PlusOne", id, 1), utils.AssertActivity("PlusOne", id, 2), + utils.AssertActivity("PlusOne", id, 3), utils.AssertActivity("PlusOne", id, 4), utils.AssertActivity("PlusOne", id, 5), + utils.AssertActivity("PlusOne", id, 6), utils.AssertActivity("PlusOne", id, 7), utils.AssertActivity("PlusOne", id, 8), + utils.AssertActivity("PlusOne", id, 9), utils.AssertOrchestratorExecuted("ActivityChain", id, "COMPLETED"), ) } @@ -274,7 +329,7 @@ func Test_ActivityRetries(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -291,14 +346,14 @@ func Test_ActivityRetries(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ActivityRetries", id), - assertActivity("FailActivity", id, 0), - assertTimer(id, assertTaskID(1)), - assertActivity("FailActivity", id, 2), - assertTimer(id, assertTaskID(3)), - assertActivity("FailActivity", id, 4), - assertOrchestratorExecuted("ActivityRetries", id, "FAILED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ActivityRetries", id), + utils.AssertActivity("FailActivity", id, 0), + utils.AssertTimer(id, utils.AssertTaskID(1)), + utils.AssertActivity("FailActivity", id, 2), + utils.AssertTimer(id, utils.AssertTaskID(3)), + utils.AssertActivity("FailActivity", id, 4), + utils.AssertOrchestratorExecuted("ActivityRetries", id, "FAILED"), ) } @@ -332,7 +387,7 @@ func Test_ActivityFanOut(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r, backend.WithMaxParallelism(10)) defer worker.Shutdown(ctx) @@ -351,8 +406,8 @@ func Test_ActivityFanOut(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ActivityFanOut", id), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ActivityFanOut", id), // TODO: Find a way to assert an unordered sequence of traces since the order of activity traces is non-deterministic. ) } @@ -380,7 +435,7 @@ func Test_SingleSubOrchestrator_Completed(t *testing.T) { }) ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -392,10 +447,10 @@ func Test_SingleSubOrchestrator_Completed(t *testing.T) { assert.Equal(t, `"Hello, world!"`, metadata.Output.Value) spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("Parent", id), - assertOrchestratorExecuted("Child", id+"_child", "COMPLETED"), - assertOrchestratorExecuted("Parent", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("Parent", id), + utils.AssertOrchestratorExecuted("Child", id+"_child", "COMPLETED"), + utils.AssertOrchestratorExecuted("Parent", id, "COMPLETED"), ) } @@ -412,7 +467,7 @@ func Test_SingleSubOrchestrator_Failed(t *testing.T) { }) ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -426,10 +481,10 @@ func Test_SingleSubOrchestrator_Failed(t *testing.T) { } spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("Parent", id), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertOrchestratorExecuted("Parent", id, "FAILED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("Parent", id), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertOrchestratorExecuted("Parent", id, "FAILED"), ) } @@ -451,7 +506,7 @@ func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { }) ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -465,14 +520,14 @@ func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { } spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("Parent", id), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertTimer(id, assertTaskID(1)), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertTimer(id, assertTaskID(3)), - assertOrchestratorExecuted("Child", id+"_child", "FAILED"), - assertOrchestratorExecuted("Parent", id, "FAILED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("Parent", id), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertTimer(id, utils.AssertTaskID(1)), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertTimer(id, utils.AssertTaskID(3)), + utils.AssertOrchestratorExecuted("Child", id+"_child", "FAILED"), + utils.AssertOrchestratorExecuted("Parent", id, "FAILED"), ) } @@ -496,7 +551,7 @@ func Test_ContinueAsNew(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -512,19 +567,19 @@ func Test_ContinueAsNew(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ContinueAsNewTest", id), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertTimer(id), assertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), - assertOrchestratorExecuted("ContinueAsNewTest", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ContinueAsNewTest", id), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertTimer(id), utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "CONTINUED_AS_NEW"), + utils.AssertOrchestratorExecuted("ContinueAsNewTest", id, "COMPLETED"), ) } @@ -632,7 +687,7 @@ func Test_ExternalEventOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -656,19 +711,19 @@ func Test_ExternalEventOrchestration(t *testing.T) { // Validate the exported OTel traces eventSizeInBytes := 1 spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("ExternalEventOrchestration", id), - assertOrchestratorExecuted("ExternalEventOrchestration", id, "COMPLETED", assertSpanEvents( - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ExternalEventOrchestration", id), + utils.AssertOrchestratorExecuted("ExternalEventOrchestration", id, "COMPLETED", utils.AssertSpanEvents( + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), )), ) } @@ -685,7 +740,7 @@ func Test_ExternalEventTimeout(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -711,10 +766,10 @@ func Test_ExternalEventTimeout(t *testing.T) { assert.True(t, api.OrchestrationMetadataIsComplete(metadata)) assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) - assertSpanSequence(t, spans, - assertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), - assertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "COMPLETED", assertSpanEvents( - assertExternalEvent("MyEvent", 0), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), + utils.AssertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "COMPLETED", utils.AssertSpanEvents( + utils.AssertExternalEvent("MyEvent", 0), )), ) } else { @@ -727,11 +782,11 @@ func Test_ExternalEventTimeout(t *testing.T) { assert.Equal(t, "the task was canceled", metadata.FailureDetails.ErrorMessage) } - assertSpanSequence(t, spans, - assertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("ExternalEventOrchestrationWithTimeout", id), // A timer is used to implement the event timeout - assertTimer(id), - assertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "FAILED", assertSpanEvents()), + utils.AssertTimer(id), + utils.AssertOrchestratorExecuted("ExternalEventOrchestrationWithTimeout", id, "FAILED", utils.AssertSpanEvents()), ) } }) @@ -757,7 +812,7 @@ func Test_SuspendResumeOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -801,21 +856,21 @@ func Test_SuspendResumeOrchestration(t *testing.T) { // Validate the exported OTel traces eventSizeInBytes := 1 spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SuspendResumeOrchestration", id), - assertOrchestratorExecuted("SuspendResumeOrchestration", id, "COMPLETED", assertSpanEvents( - assertSuspendedEvent(), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertExternalEvent("MyEvent", eventSizeInBytes), - assertResumedEvent(), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SuspendResumeOrchestration", id), + utils.AssertOrchestratorExecuted("SuspendResumeOrchestration", id, "COMPLETED", utils.AssertSpanEvents( + utils.AssertSuspendedEvent(), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertExternalEvent("MyEvent", eventSizeInBytes), + utils.AssertResumedEvent(), )), ) } @@ -830,7 +885,7 @@ func Test_TerminateOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -850,9 +905,9 @@ func Test_TerminateOrchestration(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("MyOrchestrator", id), - assertOrchestratorExecuted("MyOrchestrator", id, "TERMINATED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("MyOrchestrator", id), + utils.AssertOrchestratorExecuted("MyOrchestrator", id, "TERMINATED"), ) } @@ -1170,7 +1225,7 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { // Initialization ctx := context.Background() - exporter := initTracing() + exporter := utils.InitTracing() client, worker := initTaskHubWorker(ctx, r) defer worker.Shutdown(ctx) @@ -1194,13 +1249,13 @@ func Test_RecreateCompletedOrchestration(t *testing.T) { // Validate the exported OTel traces spans := exporter.GetSpans().Snapshots() - assertSpanSequence(t, spans, - assertOrchestratorCreated("SingleActivity", id), - assertActivity("SayHello", id, 0), - assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), - assertOrchestratorCreated("SingleActivity", id), - assertActivity("SayHello", id, 0), - assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + utils.AssertSpanSequence(t, spans, + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), + utils.AssertOrchestratorCreated("SingleActivity", id), + utils.AssertActivity("SayHello", id, 0), + utils.AssertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), ) } diff --git a/tests/tracing_test.go b/tests/utils/tracing.go similarity index 88% rename from tests/tracing_test.go rename to tests/utils/tracing.go index ec598631..4a40085b 100644 --- a/tests/tracing_test.go +++ b/tests/utils/tracing.go @@ -1,4 +1,4 @@ -package tests +package utils import ( "fmt" @@ -25,14 +25,14 @@ var ( sharedTraceExporter = tracetest.NewInMemoryExporter() ) -func assertSpanSequence(t assert.TestingT, spans []trace.ReadOnlySpan, spanAsserts ...spanValidator) { +func AssertSpanSequence(t assert.TestingT, spans []trace.ReadOnlySpan, spanAsserts ...spanValidator) { for i, f := range spanAsserts { f(t, spans, i) } } // assertOrchestratorCreated validates a create_orchestration span -func assertOrchestratorCreated(name string, id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { +func AssertOrchestratorCreated(name string, id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { spanName := fmt.Sprintf("create_orchestration||%s", name) opts := []spanAttributeValidator{ assertTaskType("orchestration"), @@ -40,11 +40,11 @@ func assertOrchestratorCreated(name string, id api.InstanceID, optionalAsserts . assertInstanceID(id), } opts = append(opts, optionalAsserts...) - return assertSpan(spanName, opts...) + return AssertSpan(spanName, opts...) } // assertOrchestratorCreated validates an orchestration span -func assertOrchestratorExecuted(name string, id api.InstanceID, status string, optionalAsserts ...spanAttributeValidator) spanValidator { +func AssertOrchestratorExecuted(name string, id api.InstanceID, status string, optionalAsserts ...spanAttributeValidator) spanValidator { spanName := fmt.Sprintf("orchestration||%s", name) opts := []spanAttributeValidator{ assertTaskType("orchestration"), @@ -53,31 +53,31 @@ func assertOrchestratorExecuted(name string, id api.InstanceID, status string, o assertStatus(status), } opts = append(opts, optionalAsserts...) - return assertSpan(spanName, opts...) + return AssertSpan(spanName, opts...) } -func assertActivity(name string, id api.InstanceID, taskID int64, optionalAsserts ...spanAttributeValidator) spanValidator { +func AssertActivity(name string, id api.InstanceID, taskID int64, optionalAsserts ...spanAttributeValidator) spanValidator { spanName := fmt.Sprintf("activity||%s", name) opts := []spanAttributeValidator{ assertTaskType("activity"), assertTaskName(name), assertInstanceID(id), - assertTaskID(taskID), + AssertTaskID(taskID), } opts = append(opts, optionalAsserts...) - return assertSpan(spanName, opts...) + return AssertSpan(spanName, opts...) } -func assertTimer(id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { +func AssertTimer(id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator { opts := []spanAttributeValidator{ assertInstanceID(id), assertTimerFired(), } opts = append(opts, optionalAsserts...) - return assertSpan("timer", opts...) + return AssertSpan("timer", opts...) } -func assertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator { +func AssertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator { return func(t assert.TestingT, span trace.ReadOnlySpan) bool { if assert.Equal(t, len(eventAsserts), len(span.Events()), "unexpected number of span events") { for i, f := range eventAsserts { @@ -90,7 +90,7 @@ func assertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator } } -func assertExternalEvent(eventName string, payloadSize int) spanEventValidator { +func AssertExternalEvent(eventName string, payloadSize int) spanEventValidator { return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { event := span.Events()[eventIndex] hasMessage := assert.Equal(t, "Received external event", event.Name) @@ -106,21 +106,21 @@ func assertExternalEvent(eventName string, payloadSize int) spanEventValidator { } } -func assertSuspendedEvent() spanEventValidator { +func AssertSuspendedEvent() spanEventValidator { return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { event := span.Events()[eventIndex] return assert.Equal(t, "Execution suspended", event.Name) } } -func assertResumedEvent() spanEventValidator { +func AssertResumedEvent() spanEventValidator { return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool { event := span.Events()[eventIndex] return assert.Equal(t, "Execution resumed", event.Name) } } -func assertSpan(name string, optionalAsserts ...spanAttributeValidator) spanValidator { +func AssertSpan(name string, optionalAsserts ...spanAttributeValidator) spanValidator { return func(t assert.TestingT, spans []trace.ReadOnlySpan, index int) { if !doAssertSpan(t, spans, index, name, optionalAsserts...) { fmt.Printf("span assertion for %s (index=%d) failed\n", name, index) @@ -167,7 +167,7 @@ func assertTaskName(expectedTaskName string) spanAttributeValidator { } } -func assertTaskID(expectedTaskID int64) spanAttributeValidator { +func AssertTaskID(expectedTaskID int64) spanAttributeValidator { return func(t assert.TestingT, span trace.ReadOnlySpan) bool { return assert.Contains(t, span.Attributes(), attribute.KeyValue{ Key: "durabletask.task.task_id", @@ -222,7 +222,7 @@ func assertTimerFired() spanAttributeValidator { // to examine the exported traces. We only want to look at exported traces because we do // tricks to mark certain spans as non-exported (i.e. orchestration replays), and want // to ensure that those spans are never actually exported. -func initTracing() *tracetest.InMemoryExporter { +func InitTracing() *tracetest.InMemoryExporter { // The global tracer provider can only be initialized once. // Subsequent initializations will silently fail. initTracingOnce.Do(func() {