From 83196a5506fedc1185d19421d1301830c2ef2d26 Mon Sep 17 00:00:00 2001 From: Nichloas Date: Fri, 19 Dec 2025 23:11:44 +0800 Subject: [PATCH 1/2] Add sample for scheduling multiple executors --- samples/heterogeneous/heterogeneous.go | 262 +++++++++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 samples/heterogeneous/heterogeneous.go diff --git a/samples/heterogeneous/heterogeneous.go b/samples/heterogeneous/heterogeneous.go new file mode 100644 index 0000000..ba9a3c1 --- /dev/null +++ b/samples/heterogeneous/heterogeneous.go @@ -0,0 +1,262 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/microsoft/durabletask-go/api" + "github.com/microsoft/durabletask-go/backend" + "github.com/microsoft/durabletask-go/backend/sqlite" + "github.com/microsoft/durabletask-go/client" + "github.com/microsoft/durabletask-go/task" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + doubleActivityName = "double" + tripleActivityName = "triple" + orchestratorName = "sixTimes" + + localExecutorName = "local" + grpcExecutorName = "grpc" +) + +type Config struct { + routes map[string]string + defaultExecutor string +} + +func (c *Config) resolve(taskName string) string { + executorName, ok := c.routes[taskName] + if !ok { + executorName = c.defaultExecutor + } + return executorName +} + +type RoutingExecutor struct { + config Config + executors map[string]backend.Executor +} + +func (e *RoutingExecutor) getExecutor(taskName string) (backend.Executor, error) { + executorName := e.config.resolve(taskName) + executor, ok := e.executors[executorName] + if !ok { + return nil, fmt.Errorf("executor %s for task %s not found", executorName, taskName) + } + return executor, nil +} + +func (e *RoutingExecutor) getOrchestratorName(oldEvents, newEvents []*backend.HistoryEvent) string { + + for _, event := range oldEvents { + if x := event.GetExecutionStarted(); x != nil { + return x.Name + } + } + for _, event := range newEvents { + if x := event.GetExecutionStarted(); x != nil { + return x.Name + } + } + return "" +} + +func (e *RoutingExecutor) ExecuteOrchestrator(ctx context.Context, id api.InstanceID, oldEvents []*backend.HistoryEvent, newEvents []*backend.HistoryEvent) (*backend.ExecutionResults, error) { + name := e.getOrchestratorName(oldEvents, newEvents) + + executor, err := e.getExecutor(name) + if err != nil { + return nil, err + } + + return executor.ExecuteOrchestrator(ctx, id, oldEvents, newEvents) +} + +func (e *RoutingExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID, event *backend.HistoryEvent) (*backend.HistoryEvent, error) { + name := event.GetTaskScheduled().GetName() + executor, err := e.getExecutor(name) + if err != nil { + return nil, err + } + return executor.ExecuteActivity(ctx, id, event) +} + +func (e *RoutingExecutor) Shutdown(ctx context.Context) error { + ch := make(chan error) + defer close(ch) + for _, executor := range e.executors { + go func() { + err := executor.Shutdown(ctx) + if err != nil { + ch <- err + } + }() + } + var errs []error + for err := range ch { + errs = append(errs, err) + } + + if len(errs) == 0 { + return nil + } + + return errors.Join(errs...) +} + +func main() { + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + logger := backend.DefaultLogger() + be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger) + + executor, err := initExecutor(ctx, be, logger) + if err != nil { + panic(err) + } + + workflowWorker := backend.NewOrchestrationWorker(be, executor, logger) + activityWorker := backend.NewActivityTaskWorker(be, executor, logger) + taskHubWorker := backend.NewTaskHubWorker(be, workflowWorker, activityWorker, logger) + + if err := taskHubWorker.Start(ctx); err != nil { + panic(err) + } + defer taskHubWorker.Shutdown(ctx) + + taskHubClient := backend.NewTaskHubClient(be) + id, err := taskHubClient.ScheduleNewOrchestration(ctx, orchestratorName, api.WithInput(1)) + if err != nil { + panic(err) + } + metadata, err := taskHubClient.WaitForOrchestrationCompletion(ctx, id) + if err != nil { + panic(err) + } + fmt.Println(metadata.SerializedOutput) +} + +func initExecutor(ctx context.Context, be backend.Backend, logger backend.Logger) (backend.Executor, error) { + + timesActivity := func(times int) task.Activity { + return func(ctx task.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + return input * times, nil + } + } + doubleActivity := timesActivity(2) + tripleActivity := timesActivity(3) + + // init a local executor register double and triple activity + localExecutor, err := func() (backend.Executor, error) { + r := task.NewTaskRegistry() + if err := r.AddActivityN(doubleActivityName, doubleActivity); err != nil { + return nil, err + } + + if err := r.AddActivityN(tripleActivityName, tripleActivity); err != nil { + return nil, err + } + + executor := task.NewTaskExecutor(r) + return executor, nil + }() + if err != nil { + return nil, err + } + + orchestrator := func(ctx *task.OrchestrationContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + + var intermediateResult int + if err := ctx.CallActivity(doubleActivityName, task.WithActivityInput(input)).Await(&intermediateResult); err != nil { + return nil, err + } + + var finalResult int + if err := ctx.CallActivity(tripleActivityName, task.WithActivityInput(intermediateResult)).Await(&finalResult); err != nil { + return nil, err + } + return finalResult, nil + } + + // init a grpc executor register the orchestrator, and connect a worker to the grpc executor + grpcExecutor, err := func() (backend.Executor, error) { + address := "localhost:4001" + grpcServer := grpc.NewServer() + executor, registerFn := backend.NewGrpcExecutor(be, logger) + registerFn(grpcServer) + + lis, err := net.Listen("tcp", address) + if err != nil { + return nil, err + } + go func() { + if err := grpcServer.Serve(lis); err != nil { + panic(err) + } + }() + + // create a worker connects to the grpc server + var wg sync.WaitGroup + wg.Add(1) + + go func() { + // wait for grpc started + time.Sleep(time.Second) + conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + panic(err) + } + + workerClient := client.NewTaskHubGrpcClient(conn, logger) + r := task.NewTaskRegistry() + if err := r.AddOrchestratorN(orchestratorName, orchestrator); err != nil { + panic(err) + } + wg.Done() + + if err := workerClient.StartWorkItemListener(ctx, r); err != nil { + panic(err) + } + }() + + wg.Wait() + + return executor, nil + }() + if err != nil { + return nil, err + } + + executor := &RoutingExecutor{ + config: Config{ + routes: map[string]string{ + doubleActivityName: localExecutorName, + tripleActivityName: localExecutorName, + }, + defaultExecutor: grpcExecutorName, + }, + executors: map[string]backend.Executor{ + localExecutorName: localExecutor, + grpcExecutorName: grpcExecutor, + }, + } + + return executor, nil +} From 66a95c8f98fdbbb70d59ed47a619e12815273282 Mon Sep 17 00:00:00 2001 From: Nichloas Date: Sat, 20 Dec 2025 11:23:03 +0800 Subject: [PATCH 2/2] Refactor heterogeneous sample and address PR review feedback - Refactor RoutingExecutor and initExecutor for better modularity and readability. - Export Config and RoutingExecutor fields to align with Go conventions. - Improve lifecycle management of gRPC server and client connections using context. - Fix Shutdown logic to properly handle errors from multiple executors. - Add comprehensive documentation comments for key types and methods. - Use grpc.DialContext with WithBlock() for more reliable connection initialization. --- samples/heterogeneous/heterogeneous.go | 200 ++++++++++++++----------- 1 file changed, 109 insertions(+), 91 deletions(-) diff --git a/samples/heterogeneous/heterogeneous.go b/samples/heterogeneous/heterogeneous.go index ba9a3c1..b9b87fb 100644 --- a/samples/heterogeneous/heterogeneous.go +++ b/samples/heterogeneous/heterogeneous.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "sync" "time" "github.com/microsoft/durabletask-go/api" @@ -26,33 +25,47 @@ const ( grpcExecutorName = "grpc" ) +// Config defines the routing configuration for tasks. +// It maps task names to executor names and specifies a default executor. type Config struct { - routes map[string]string - defaultExecutor string + Routes map[string]string + DefaultExecutor string } -func (c *Config) resolve(taskName string) string { - executorName, ok := c.routes[taskName] +// Resolve returns the executor name for a given task name. +// If no specific route is found, it returns the default executor name. +func (c *Config) Resolve(taskName string) string { + executorName, ok := c.Routes[taskName] if !ok { - executorName = c.defaultExecutor + executorName = c.DefaultExecutor } return executorName } +// RoutingExecutor is a backend.Executor implementation that Routes tasks to different Executors +// based on a configuration. This allows for heterogeneous execution environments where +// different tasks (orchestrators or activities) can be executed by different Executors +// (e.g., local vs gRPC). type RoutingExecutor struct { - config Config - executors map[string]backend.Executor + Config Config + Executors map[string]backend.Executor } +// getExecutor retrieves the appropriate backend.Executor for a given task name +// based on the routing configuration. func (e *RoutingExecutor) getExecutor(taskName string) (backend.Executor, error) { - executorName := e.config.resolve(taskName) - executor, ok := e.executors[executorName] + executorName := e.Config.Resolve(taskName) + executor, ok := e.Executors[executorName] if !ok { return nil, fmt.Errorf("executor %s for task %s not found", executorName, taskName) } return executor, nil } +// getOrchestratorName extracts the orchestrator name from the provided history events. +// It iterates through both old and new events to find the ExecutionStarted event, +// which contains the orchestrator's name. This is crucial for routing orchestrations +// to the correct executor based on their name. func (e *RoutingExecutor) getOrchestratorName(oldEvents, newEvents []*backend.HistoryEvent) string { for _, event := range oldEvents { @@ -68,6 +81,8 @@ func (e *RoutingExecutor) getOrchestratorName(oldEvents, newEvents []*backend.Hi return "" } +// ExecuteOrchestrator Routes the orchestration execution request to the appropriate executor. +// It determines the orchestrator name from history events and uses it to Resolve the executor. func (e *RoutingExecutor) ExecuteOrchestrator(ctx context.Context, id api.InstanceID, oldEvents []*backend.HistoryEvent, newEvents []*backend.HistoryEvent) (*backend.ExecutionResults, error) { name := e.getOrchestratorName(oldEvents, newEvents) @@ -79,6 +94,8 @@ func (e *RoutingExecutor) ExecuteOrchestrator(ctx context.Context, id api.Instan return executor.ExecuteOrchestrator(ctx, id, oldEvents, newEvents) } +// ExecuteActivity Routes the activity execution request to the appropriate executor. +// It extracts the activity name from the task scheduled event and uses it to Resolve the executor. func (e *RoutingExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID, event *backend.HistoryEvent) (*backend.HistoryEvent, error) { name := event.GetTaskScheduled().GetName() executor, err := e.getExecutor(name) @@ -88,20 +105,15 @@ func (e *RoutingExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID return executor.ExecuteActivity(ctx, id, event) } +// Shutdown shuts down all registered Executors in parallel. +// It returns a joined error if any of the Executors fail to shut down. func (e *RoutingExecutor) Shutdown(ctx context.Context) error { - ch := make(chan error) - defer close(ch) - for _, executor := range e.executors { - go func() { - err := executor.Shutdown(ctx) - if err != nil { - ch <- err - } - }() - } var errs []error - for err := range ch { - errs = append(errs, err) + for _, executor := range e.Executors { + err := executor.Shutdown(ctx) + if err != nil { + errs = append(errs, err) + } } if len(errs) == 0 { @@ -113,7 +125,7 @@ func (e *RoutingExecutor) Shutdown(ctx context.Context) error { func main() { - ctx, cancelFunc := context.WithCancel(context.Background()) + ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFunc() logger := backend.DefaultLogger() @@ -146,7 +158,32 @@ func main() { } func initExecutor(ctx context.Context, be backend.Backend, logger backend.Logger) (backend.Executor, error) { + localExecutor, err := setupLocalExecutor() + if err != nil { + return nil, err + } + + grpcExecutor, err := setupGrpcExecutor(ctx, be, logger) + if err != nil { + return nil, err + } + + return &RoutingExecutor{ + Config: Config{ + Routes: map[string]string{ + doubleActivityName: localExecutorName, + tripleActivityName: localExecutorName, + }, + DefaultExecutor: grpcExecutorName, + }, + Executors: map[string]backend.Executor{ + localExecutorName: localExecutor, + grpcExecutorName: grpcExecutor, + }, + }, nil +} +func setupLocalExecutor() (backend.Executor, error) { timesActivity := func(times int) task.Activity { return func(ctx task.ActivityContext) (any, error) { var input int @@ -159,24 +196,57 @@ func initExecutor(ctx context.Context, be backend.Backend, logger backend.Logger doubleActivity := timesActivity(2) tripleActivity := timesActivity(3) - // init a local executor register double and triple activity - localExecutor, err := func() (backend.Executor, error) { - r := task.NewTaskRegistry() - if err := r.AddActivityN(doubleActivityName, doubleActivity); err != nil { - return nil, err - } + r := task.NewTaskRegistry() + if err := r.AddActivityN(doubleActivityName, doubleActivity); err != nil { + return nil, err + } - if err := r.AddActivityN(tripleActivityName, tripleActivity); err != nil { - return nil, err + if err := r.AddActivityN(tripleActivityName, tripleActivity); err != nil { + return nil, err + } + + return task.NewTaskExecutor(r), nil +} + +func setupGrpcExecutor(ctx context.Context, be backend.Backend, logger backend.Logger) (backend.Executor, error) { + address := "localhost:0" + grpcServer := grpc.NewServer() + executor, registerFn := backend.NewGrpcExecutor(be, logger) + registerFn(grpcServer) + + lis, err := net.Listen("tcp", address) + if err != nil { + return nil, err + } + go func() { + if err := grpcServer.Serve(lis); err != nil { + panic(err) } + }() - executor := task.NewTaskExecutor(r) - return executor, nil + go func() { + <-ctx.Done() + grpcServer.GracefulStop() + _ = lis.Close() }() + + // Create a worker that connects to the gRPC server. + // establish a gRPC connection, blocking until the server is ready or the timeout expires + conn, err := grpc.DialContext( + ctx, + lis.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) if err != nil { return nil, err } + go func() { + <-ctx.Done() + _ = conn.Close() + }() + orchestrator := func(ctx *task.OrchestrationContext) (any, error) { var input int if err := ctx.GetInput(&input); err != nil { @@ -195,67 +265,15 @@ func initExecutor(ctx context.Context, be backend.Backend, logger backend.Logger return finalResult, nil } - // init a grpc executor register the orchestrator, and connect a worker to the grpc executor - grpcExecutor, err := func() (backend.Executor, error) { - address := "localhost:4001" - grpcServer := grpc.NewServer() - executor, registerFn := backend.NewGrpcExecutor(be, logger) - registerFn(grpcServer) - - lis, err := net.Listen("tcp", address) - if err != nil { - return nil, err - } - go func() { - if err := grpcServer.Serve(lis); err != nil { - panic(err) - } - }() - - // create a worker connects to the grpc server - var wg sync.WaitGroup - wg.Add(1) - - go func() { - // wait for grpc started - time.Sleep(time.Second) - conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - panic(err) - } - - workerClient := client.NewTaskHubGrpcClient(conn, logger) - r := task.NewTaskRegistry() - if err := r.AddOrchestratorN(orchestratorName, orchestrator); err != nil { - panic(err) - } - wg.Done() - - if err := workerClient.StartWorkItemListener(ctx, r); err != nil { - panic(err) - } - }() - - wg.Wait() - - return executor, nil - }() - if err != nil { + workerClient := client.NewTaskHubGrpcClient(conn, logger) + r := task.NewTaskRegistry() + if err := r.AddOrchestratorN(orchestratorName, orchestrator); err != nil { return nil, err } - executor := &RoutingExecutor{ - config: Config{ - routes: map[string]string{ - doubleActivityName: localExecutorName, - tripleActivityName: localExecutorName, - }, - defaultExecutor: grpcExecutorName, - }, - executors: map[string]backend.Executor{ - localExecutorName: localExecutor, - grpcExecutorName: grpcExecutor, - }, + // StartWorkItemListener is not blocking + if err := workerClient.StartWorkItemListener(ctx, r); err != nil { + return nil, err } return executor, nil