diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 02c33541..762322c7 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -24,4 +24,4 @@ jobs: - name: Build run: dotnet build --no-restore ConductorSharp.sln - name: Test - run: dotnet test --no-restore --verbosity normal ConductorSharp.sln \ No newline at end of file + run: dotnet test --no-restore --verbosity normal test/ConductorSharp.Engine.Tests \ No newline at end of file diff --git a/.husky/pre-commit b/.husky/pre-commit old mode 100644 new mode 100755 diff --git a/ConductorSharp.sln b/ConductorSharp.sln index 4c182785..85767dce 100644 --- a/ConductorSharp.sln +++ b/ConductorSharp.sln @@ -30,6 +30,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConductorSharp.Patterns", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConductorSharp.KafkaCancellationNotifier", "src\ConductorSharp.KafkaCancellationNotifier\ConductorSharp.KafkaCancellationNotifier.csproj", "{A94EE48D-17F3-432A-A47D-BCB9B1EF2670}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConductorSharp.Engine.IntegrationTests", "test\ConductorSharp.Engine.IntegrationTests\ConductorSharp.Engine.IntegrationTests.csproj", "{30EACB1C-FB15-4294-AF46-3AF7ABEB2121}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -76,6 +78,10 @@ Global {A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Debug|Any CPU.Build.0 = Debug|Any CPU {A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Release|Any CPU.ActiveCfg = Release|Any CPU {A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Release|Any CPU.Build.0 = Release|Any CPU + {30EACB1C-FB15-4294-AF46-3AF7ABEB2121}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {30EACB1C-FB15-4294-AF46-3AF7ABEB2121}.Debug|Any CPU.Build.0 = Debug|Any CPU + {30EACB1C-FB15-4294-AF46-3AF7ABEB2121}.Release|Any CPU.ActiveCfg = Release|Any CPU + {30EACB1C-FB15-4294-AF46-3AF7ABEB2121}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/examples/ConductorSharp.ApiEnabled/Extensions/ServiceCollectionExtensions.cs b/examples/ConductorSharp.ApiEnabled/Extensions/ServiceCollectionExtensions.cs index ce5f0827..fff284f2 100644 --- a/examples/ConductorSharp.ApiEnabled/Extensions/ServiceCollectionExtensions.cs +++ b/examples/ConductorSharp.ApiEnabled/Extensions/ServiceCollectionExtensions.cs @@ -1,5 +1,7 @@ -using ConductorSharp.ApiEnabled.Handlers; -using ConductorSharp.ApiEnabled.Services; +using ConductorSharp.ApiEnabled.Services; +using ConductorSharp.ApiEnabled.Workers; +using ConductorSharp.ApiEnabled.Workers; +using ConductorSharp.ApiEnabled.Workflows; using ConductorSharp.Engine.Extensions; using ConductorSharp.Engine.Health; @@ -16,23 +18,23 @@ public static IServiceCollection ConfigureApiEnabled(this IServiceCollection hos maxConcurrentWorkers: configuration.GetValue("Conductor:MaxConcurrentWorkers"), sleepInterval: configuration.GetValue("Conductor:SleepInterval"), longPollInterval: configuration.GetValue("Conductor:LongPollInterval"), - domain: configuration.GetValue("Conductor:WorkerDomain"), - typeof(ServiceCollectionExtensions).Assembly + domain: configuration.GetValue("Conductor:WorkerDomain") ) .SetHealthCheckService() .AddPipelines(pipelines => { pipelines.AddExecutionTaskTracking(); - pipelines.AddContextLogging(); - pipelines.AddRequestResponseLogging(); pipelines.AddValidation(); }); hostBuilder.AddSingleton(); - hostBuilder.RegisterWorkerTask(options => + hostBuilder.RegisterWorkerTask(options => { options.OwnerEmail = "owneremail@gmail.com"; }); + hostBuilder.RegisterWorkerTask(); + hostBuilder.RegisterWorkerTask(); + hostBuilder.RegisterWorkflow(); return hostBuilder; } diff --git a/examples/ConductorSharp.ApiEnabled/Program.cs b/examples/ConductorSharp.ApiEnabled/Program.cs index 6c038bed..d9e4e7a4 100644 --- a/examples/ConductorSharp.ApiEnabled/Program.cs +++ b/examples/ConductorSharp.ApiEnabled/Program.cs @@ -34,3 +34,5 @@ app.MapControllers(); app.MapHealthChecks("/health"); app.Run(); + +public partial class Program { } diff --git a/examples/ConductorSharp.ApiEnabled/Workers/FirstTestWorker.cs b/examples/ConductorSharp.ApiEnabled/Workers/FirstTestWorker.cs new file mode 100644 index 00000000..c6c2984e --- /dev/null +++ b/examples/ConductorSharp.ApiEnabled/Workers/FirstTestWorker.cs @@ -0,0 +1,33 @@ +using ConductorSharp.Engine; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.ApiEnabled.Workers +{ + public class FirstTestWorker : ITaskInput + { + public string Input { get; set; } + + public class Response + { + public string Output { get; set; } + } + + public class Worker : Worker + { + private readonly ILogger _logger; + + public Worker(ILogger logger) + { + _logger = logger; + } + + public override Task Handle(FirstTestWorker test, WorkerExecutionContext context, CancellationToken cancellationToken) + { + _logger.LogInformation("First test worker"); + + return Task.FromResult(new() { Output = test.Input }); + } + } + } +} diff --git a/examples/ConductorSharp.ApiEnabled/Handlers/PrepareEmailHandler.cs b/examples/ConductorSharp.ApiEnabled/Workers/PrepareEmailWorker.cs similarity index 61% rename from examples/ConductorSharp.ApiEnabled/Handlers/PrepareEmailHandler.cs rename to examples/ConductorSharp.ApiEnabled/Workers/PrepareEmailWorker.cs index 7f4ccda3..41e0e936 100644 --- a/examples/ConductorSharp.ApiEnabled/Handlers/PrepareEmailHandler.cs +++ b/examples/ConductorSharp.ApiEnabled/Workers/PrepareEmailWorker.cs @@ -2,11 +2,10 @@ using ConductorSharp.Engine.Builders.Metadata; using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Util; -using MediatR; -namespace ConductorSharp.ApiEnabled.Handlers; +namespace ConductorSharp.ApiEnabled.Workers; -public class PrepareEmailRequest : IRequest +public class PrepareEmailRequest : ITaskInput { public string CustomerName { get; set; } public string Address { get; set; } @@ -18,18 +17,16 @@ public class PrepareEmailResponse } [OriginalName("EMAIL_prepare")] -public class PrepareEmailHandler : ITaskRequestHandler +public class PrepareEmailWorker : IWorker { - private readonly ConductorSharpExecutionContext _context; - private readonly ILogger _logger; + private readonly ILogger _logger; - public PrepareEmailHandler(ConductorSharpExecutionContext context, ILogger logger) + public PrepareEmailWorker(ILogger logger) { - _context = context; _logger = logger; } - public async Task Handle(PrepareEmailRequest request, CancellationToken cancellationToken) + public async Task Handle(PrepareEmailRequest request, WorkerExecutionContext context, CancellationToken cancellationToken) { var emailBodyBuilder = new StringBuilder(); @@ -39,8 +36,8 @@ public async Task Handle(PrepareEmailRequest request, Canc emailBodyBuilder.AppendLine($"Customer: {request.CustomerName}"); emailBodyBuilder.AppendLine($"Address: {request.Address}"); emailBodyBuilder.AppendLine("------------------"); - emailBodyBuilder.AppendLine($"WorkflowId : {_context.WorkflowId}"); - emailBodyBuilder.AppendLine($"WorkflowName: {_context.WorkflowName}"); + emailBodyBuilder.AppendLine($"WorkflowId : {context.WorkflowId}"); + emailBodyBuilder.AppendLine($"WorkflowName: {context.WorkflowName}"); _logger.LogInformation("Prepared email"); diff --git a/examples/ConductorSharp.ApiEnabled/Workers/SecondTestWorker.cs b/examples/ConductorSharp.ApiEnabled/Workers/SecondTestWorker.cs new file mode 100644 index 00000000..c9dcbd5b --- /dev/null +++ b/examples/ConductorSharp.ApiEnabled/Workers/SecondTestWorker.cs @@ -0,0 +1,31 @@ +using ConductorSharp.Engine; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.ApiEnabled.Workers; + +public class SecondTestWorker : ITaskInput +{ + public string Input { get; set; } + + public class Response + { + public string Output { get; set; } + } + + public class Worker : Worker + { + private readonly ILogger _logger; + + public Worker(ILogger logger) + { + _logger = logger; + } + + public override Task Handle(SecondTestWorker test, WorkerExecutionContext context, CancellationToken cancellationToken) + { + _logger.LogInformation("Second test worker"); + return Task.FromResult(new() { Output = test.Input }); + } + } +} diff --git a/examples/ConductorSharp.ApiEnabled/Workflows/TestWorkflow.cs b/examples/ConductorSharp.ApiEnabled/Workflows/TestWorkflow.cs new file mode 100644 index 00000000..a728c1d6 --- /dev/null +++ b/examples/ConductorSharp.ApiEnabled/Workflows/TestWorkflow.cs @@ -0,0 +1,36 @@ +using ConductorSharp.ApiEnabled.Workers; +using ConductorSharp.ApiEnabled.Workers; +using ConductorSharp.Engine.Builders; +using ConductorSharp.Engine.Builders.Metadata; + +namespace ConductorSharp.ApiEnabled.Workflows; + +public class TestWorkflow : WorkflowInput +{ + public string Input { get; set; } + + public class Output : WorkflowOutput + { + public string Out { get; set; } + } + + [OriginalName("TEST_workflow")] + public class Workflow : Workflow + { + public Workflow(WorkflowDefinitionBuilder builder) + : base(builder) { } + + public FirstTestWorker.Worker FirstWorker { get; set; } + + public SecondTestWorker.Worker SecondWorker { get; set; } + + public override void BuildDefinition() + { + _builder.AddTask(wf => wf.FirstWorker, wf => new() { Input = wf.Input.Input }); + + _builder.AddTask(wf => wf.SecondWorker, wf => new() { Input = wf.FirstWorker.Output.Output }); + + _builder.SetOutput(wf => new() { Out = wf.SecondWorker.Output.Output }); + } + } +} diff --git a/examples/ConductorSharp.Definitions/Behaviors/CustomBehavior.cs b/examples/ConductorSharp.Definitions/Behaviors/CustomBehavior.cs deleted file mode 100644 index fac029ec..00000000 --- a/examples/ConductorSharp.Definitions/Behaviors/CustomBehavior.cs +++ /dev/null @@ -1,23 +0,0 @@ -using MediatR; -using Microsoft.Extensions.Logging; - -namespace ConductorSharp.Definitions.Behaviors -{ - internal class CustomBehavior : IPipelineBehavior - { - private readonly ILogger> _logger; - - public CustomBehavior(ILogger> logger) - { - _logger = logger; - } - - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - _logger.LogInformation("Executed before all behaviors"); - var response = await next(); - _logger.LogInformation("Executed after all behaviors"); - return response; - } - } -} diff --git a/examples/ConductorSharp.Definitions/Generated/Task.cs b/examples/ConductorSharp.Definitions/Generated/Task.cs index 6974656d..52d4eed9 100644 --- a/examples/ConductorSharp.Definitions/Generated/Task.cs +++ b/examples/ConductorSharp.Definitions/Generated/Task.cs @@ -1,12 +1,12 @@ using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util; -using MediatR; using Newtonsoft.Json; namespace ConductorSharp.Definitions.Generated { - public partial class EmailPrepareV1Input : IRequest + public partial class EmailPrepareV1Input : ITaskInput { /// /// address @@ -33,7 +33,7 @@ public partial class EmailPrepareV1Output [OriginalName("EMAIL_prepare")] public partial class EmailPrepareV1 : SimpleTaskModel { } - public partial class CustomerGetV1Input : IRequest + public partial class CustomerGetV1Input : ITaskInput { /// /// customer_id @@ -61,7 +61,7 @@ public partial class CustomerGetV1Output [OriginalName("CUSTOMER_get")] public partial class CustomerGetV1 : SimpleTaskModel { } - public partial class EnumTaskInput : IRequest + public partial class EnumTaskInput : ITaskInput { /// /// status diff --git a/examples/ConductorSharp.Definitions/Middlewares/CustomMiddleware.cs b/examples/ConductorSharp.Definitions/Middlewares/CustomMiddleware.cs new file mode 100644 index 00000000..6294af95 --- /dev/null +++ b/examples/ConductorSharp.Definitions/Middlewares/CustomMiddleware.cs @@ -0,0 +1,29 @@ +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; +using Microsoft.Extensions.Logging; + +namespace ConductorSharp.Definitions.Middlewares; + +internal class CustomMiddleware : IWorkerMiddleware + where TRequest : ITaskInput, new() +{ + private readonly ILogger> _logger; + + public CustomMiddleware(ILogger> logger) + { + _logger = logger; + } + + public async Task Handle( + TRequest request, + WorkerExecutionContext context, + Func> next, + CancellationToken cancellationToken + ) + { + _logger.LogInformation("Executed before all middlewares"); + var response = await next(); + _logger.LogInformation("Executed after all middlewares"); + return response; + } +} diff --git a/examples/ConductorSharp.Definitions/Program.cs b/examples/ConductorSharp.Definitions/Program.cs index dc14223f..662b11a0 100644 --- a/examples/ConductorSharp.Definitions/Program.cs +++ b/examples/ConductorSharp.Definitions/Program.cs @@ -1,4 +1,4 @@ -using ConductorSharp.Definitions.Behaviors; +using ConductorSharp.Definitions.Middlewares; using ConductorSharp.Definitions.Workflows; using ConductorSharp.Engine.Extensions; using ConductorSharp.Engine.Health; @@ -25,15 +25,13 @@ maxConcurrentWorkers: configuration.GetValue("Conductor:MaxConcurrentWorkers"), sleepInterval: configuration.GetValue("Conductor:SleepInterval"), longPollInterval: configuration.GetValue("Conductor:LongPollInterval"), - domain: configuration.GetValue("Conductor:WorkerDomain"), - typeof(Program).Assembly + domain: configuration.GetValue("Conductor:WorkerDomain") ) .AddConductorSharpPatterns() .SetHealthCheckService() .AddPipelines(pipelines => { - pipelines.AddCustomBehavior(typeof(CustomBehavior<,>)); - pipelines.AddRequestResponseLogging(); + pipelines.AddCustomMiddleware(typeof(CustomMiddleware<,>)); pipelines.AddValidation(); }) .AddCSharpLambdaTasks(); diff --git a/examples/ConductorSharp.Definitions/Workflows/CSharpLambdaWorkflow.cs b/examples/ConductorSharp.Definitions/Workflows/CSharpLambdaWorkflow.cs index 4a75b830..84317cb6 100644 --- a/examples/ConductorSharp.Definitions/Workflows/CSharpLambdaWorkflow.cs +++ b/examples/ConductorSharp.Definitions/Workflows/CSharpLambdaWorkflow.cs @@ -1,9 +1,9 @@ using ConductorSharp.Engine.Builders; using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Patterns.Builders; using ConductorSharp.Patterns.Model; -using MediatR; namespace ConductorSharp.Definitions.Workflows { @@ -18,7 +18,7 @@ public class CSharpLambdaWorkflowOutput : WorkflowOutput { } [WorkflowMetadata(OwnerEmail = "test@test.com")] public class CSharpLambdaWorkflow : Workflow { - public class LambdaTaskInput : IRequest + public class LambdaTaskInput : ITaskInput { public string LambdaInput { get; set; } } diff --git a/examples/ConductorSharp.Definitions/Workflows/SendCustomerNotification.cs b/examples/ConductorSharp.Definitions/Workflows/SendCustomerNotification.cs index d21f78d9..cf207672 100644 --- a/examples/ConductorSharp.Definitions/Workflows/SendCustomerNotification.cs +++ b/examples/ConductorSharp.Definitions/Workflows/SendCustomerNotification.cs @@ -1,10 +1,9 @@ using ConductorSharp.Definitions.Generated; using ConductorSharp.Engine.Builders; using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; -using ConductorSharp.Engine.Util; using ConductorSharp.Patterns.Tasks; -using MediatR; namespace ConductorSharp.Definitions.Workflows { @@ -21,7 +20,7 @@ public class SendCustomerNotificationOutput : WorkflowOutput public object Constant { get; set; } } - public class ExpectedDynamicInput : CustomerGetV1Input, IRequest { } + public class ExpectedDynamicInput : CustomerGetV1Input, ITaskInput { } public class ExpectedDynamicOutput : CustomerGetV1Output { } diff --git a/examples/ConductorSharp.Definitions/appsettings.json b/examples/ConductorSharp.Definitions/appsettings.json index dc040a32..87e88d20 100644 --- a/examples/ConductorSharp.Definitions/appsettings.json +++ b/examples/ConductorSharp.Definitions/appsettings.json @@ -1,6 +1,6 @@ { "Conductor": { - "BaseUrl": "http://localhost:8080", + "BaseUrl": "http://localhost:8127", "LongPollInterval": 100, "MaxConcurrentWorkers": 10, "SleepInterval": 500 diff --git a/examples/ConductorSharp.NoApi/Behaviors/PrepareEmailBehavior.cs b/examples/ConductorSharp.NoApi/Behaviors/PrepareEmailBehavior.cs deleted file mode 100644 index 33e1845b..00000000 --- a/examples/ConductorSharp.NoApi/Behaviors/PrepareEmailBehavior.cs +++ /dev/null @@ -1,28 +0,0 @@ -using ConductorSharp.NoApi.Handlers; -using MediatR; -using Microsoft.Extensions.Logging; - -namespace ConductorSharp.NoApi.Behaviors -{ - internal class PrepareEmailBehavior : IPipelineBehavior - { - private readonly ILogger _logger; - - public PrepareEmailBehavior(ILogger logger) - { - _logger = logger; - } - - public async Task Handle( - PrepareEmailRequest request, - RequestHandlerDelegate next, - CancellationToken cancellationToken - ) - { - _logger.LogInformation($"Executed only before {nameof(PrepareEmailHandler)}"); - var response = await next(); - _logger.LogInformation($"Executed only after {nameof(PrepareEmailHandler)}"); - return response; - } - } -} diff --git a/examples/ConductorSharp.NoApi/Handlers/EnumTaskHandler.cs b/examples/ConductorSharp.NoApi/Handlers/EnumTaskHandler.cs deleted file mode 100644 index 6f14bf63..00000000 --- a/examples/ConductorSharp.NoApi/Handlers/EnumTaskHandler.cs +++ /dev/null @@ -1,27 +0,0 @@ -using ConductorSharp.Client.Generated; -using ConductorSharp.Engine; -using ConductorSharp.Engine.Builders.Metadata; -using MediatR; - -namespace ConductorSharp.NoApi.Handlers -{ - public class EnumTaskInput : IRequest - { - public WorkflowStatus Status { get; set; } - } - - public class EnumTaskOutput - { - public WorkflowStatus Status { get; set; } - } - - [OriginalName("ENUM_task")] - public class EnumTaskHandler : TaskRequestHandler - { - public override Task Handle(EnumTaskInput request, CancellationToken cancellationToken) - { - Console.WriteLine(request.Status); - return System.Threading.Tasks.Task.FromResult(new EnumTaskOutput() { Status = request.Status }); - } - } -} diff --git a/examples/ConductorSharp.NoApi/MIddlewares/PrepareEmailMiddleware.cs b/examples/ConductorSharp.NoApi/MIddlewares/PrepareEmailMiddleware.cs new file mode 100644 index 00000000..025a582e --- /dev/null +++ b/examples/ConductorSharp.NoApi/MIddlewares/PrepareEmailMiddleware.cs @@ -0,0 +1,29 @@ +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; +using ConductorSharp.NoApi.Workers; +using Microsoft.Extensions.Logging; + +namespace ConductorSharp.NoApi.Middlewares; + +internal class PrepareEmailMiddleware : IWorkerMiddleware +{ + private readonly ILogger _logger; + + public PrepareEmailMiddleware(ILogger logger) + { + _logger = logger; + } + + public async Task Handle( + PrepareEmailRequest request, + WorkerExecutionContext context, + Func> next, + CancellationToken cancellationToken + ) + { + _logger.LogInformation($"Executed only before {nameof(PrepareEmailWorker)}"); + var response = await next(); + _logger.LogInformation($"Executed only after {nameof(PrepareEmailWorker)}"); + return response; + } +} diff --git a/examples/ConductorSharp.NoApi/Program.cs b/examples/ConductorSharp.NoApi/Program.cs index 48529228..ed406a95 100644 --- a/examples/ConductorSharp.NoApi/Program.cs +++ b/examples/ConductorSharp.NoApi/Program.cs @@ -1,8 +1,7 @@ using ConductorSharp.Engine.Extensions; using ConductorSharp.Engine.Health; -using ConductorSharp.KafkaCancellationNotifier.Extensions; -using ConductorSharp.NoApi.Behaviors; -using ConductorSharp.NoApi.Handlers; +using ConductorSharp.NoApi.Middlewares; +using ConductorSharp.NoApi.Workers; using ConductorSharp.Patterns.Extensions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -28,16 +27,13 @@ maxConcurrentWorkers: configuration.GetValue("Conductor:MaxConcurrentWorkers"), sleepInterval: configuration.GetValue("Conductor:SleepInterval"), longPollInterval: configuration.GetValue("Conductor:LongPollInterval"), - domain: configuration.GetValue("Conductor:WorkerDomain"), - handlerAssemblies: typeof(Program).Assembly + domain: configuration.GetValue("Conductor:WorkerDomain") ) .SetHealthCheckService() .AddPipelines(pipelines => { - pipelines.AddContextLogging(); - pipelines.AddRequestResponseLogging(); pipelines.AddValidation(); - pipelines.AddCustomBehavior(); + pipelines.AddCustomMiddleware(); }) .AddConductorSharpPatterns(); //.AddKafkaCancellationNotifier( @@ -46,9 +42,9 @@ // topicName: configuration.GetValue("Conductor:KafkaCancellationNotifier:TopicName"), // groupId: configuration.GetValue("Conductor:KafkaCancellationNotifier:GroupId")); - services.RegisterWorkerTask(); - services.RegisterWorkerTask(); - services.RegisterWorkerTask(); + services.RegisterWorkerTask(); + services.RegisterWorkerTask(); + services.RegisterWorkerTask(); } ); diff --git a/examples/ConductorSharp.NoApi/Workers/EnumTaskWorker.cs b/examples/ConductorSharp.NoApi/Workers/EnumTaskWorker.cs new file mode 100644 index 00000000..893d07c0 --- /dev/null +++ b/examples/ConductorSharp.NoApi/Workers/EnumTaskWorker.cs @@ -0,0 +1,26 @@ +using ConductorSharp.Client.Generated; +using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.NoApi.Workers; + +public class EnumTaskInput : ITaskInput +{ + public WorkflowStatus Status { get; set; } +} + +public class EnumTaskOutput +{ + public WorkflowStatus Status { get; set; } +} + +[OriginalName("ENUM_task")] +public class EnumTaskWorker : IWorker +{ + public Task Handle(EnumTaskInput request, WorkerExecutionContext context, CancellationToken cancellationToken) + { + Console.WriteLine(request.Status); + return System.Threading.Tasks.Task.FromResult(new EnumTaskOutput() { Status = request.Status }); + } +} diff --git a/examples/ConductorSharp.NoApi/Handlers/GetCustomerHandler.cs b/examples/ConductorSharp.NoApi/Workers/GetCustomerWorker.cs similarity index 78% rename from examples/ConductorSharp.NoApi/Handlers/GetCustomerHandler.cs rename to examples/ConductorSharp.NoApi/Workers/GetCustomerWorker.cs index b1531eed..b86bc9b7 100644 --- a/examples/ConductorSharp.NoApi/Handlers/GetCustomerHandler.cs +++ b/examples/ConductorSharp.NoApi/Workers/GetCustomerWorker.cs @@ -2,11 +2,10 @@ using ConductorSharp.Engine.Builders.Metadata; using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Util; -using MediatR; -namespace ConductorSharp.NoApi.Handlers; +namespace ConductorSharp.NoApi.Workers; -public class GetCustomerRequest : IRequest +public class GetCustomerRequest : ITaskInput { [Required] public int CustomerId { get; set; } @@ -26,7 +25,7 @@ public class Customer } [OriginalName("CUSTOMER_get")] -public class GetCustomerHandler : ITaskRequestHandler +public class GetCustomerWorker : IWorker { private static Customer[] customers = new Customer[] { @@ -38,7 +37,7 @@ public class GetCustomerHandler : ITaskRequestHandler Handle(GetCustomerRequest request, CancellationToken cancellationToken) + public Task Handle(GetCustomerRequest request, WorkerExecutionContext context, CancellationToken cancellationToken) { var customer = customers.First(a => a.Id == request.CustomerId); diff --git a/examples/ConductorSharp.NoApi/Handlers/PrepareEmailHandler.cs b/examples/ConductorSharp.NoApi/Workers/PrepareEmailWorker.cs similarity index 56% rename from examples/ConductorSharp.NoApi/Handlers/PrepareEmailHandler.cs rename to examples/ConductorSharp.NoApi/Workers/PrepareEmailWorker.cs index d3b31d66..9b3b0ada 100644 --- a/examples/ConductorSharp.NoApi/Handlers/PrepareEmailHandler.cs +++ b/examples/ConductorSharp.NoApi/Workers/PrepareEmailWorker.cs @@ -1,17 +1,12 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Text; using ConductorSharp.Engine.Builders.Metadata; using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Util; -using MediatR; using Microsoft.Extensions.Logging; -namespace ConductorSharp.NoApi.Handlers +namespace ConductorSharp.NoApi.Workers { - public class PrepareEmailRequest : IRequest + public class PrepareEmailRequest : ITaskInput { public string CustomerName { get; set; } public string Address { get; set; } @@ -23,18 +18,20 @@ public class PrepareEmailResponse } [OriginalName("EMAIL_prepare")] - public class PrepareEmailHandler : ITaskRequestHandler + public class PrepareEmailWorker : IWorker { - private readonly ConductorSharpExecutionContext _context; - private readonly ILogger _logger; + private readonly ILogger _logger; - public PrepareEmailHandler(ConductorSharpExecutionContext context, ILogger logger) + public PrepareEmailWorker(ILogger logger) { - _context = context; _logger = logger; } - public async Task Handle(PrepareEmailRequest request, CancellationToken cancellationToken) + public async Task Handle( + PrepareEmailRequest request, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) { var emailBodyBuilder = new StringBuilder(); @@ -44,8 +41,8 @@ public async Task Handle(PrepareEmailRequest request, Canc emailBodyBuilder.AppendLine($"Customer: {request.CustomerName}"); emailBodyBuilder.AppendLine($"Address: {request.Address}"); emailBodyBuilder.AppendLine("------------------"); - emailBodyBuilder.AppendLine($"WorkflowId : {_context.WorkflowId}"); - emailBodyBuilder.AppendLine($"WorkflowName: {_context.WorkflowName}"); + emailBodyBuilder.AppendLine($"WorkflowId : {context.WorkflowId}"); + emailBodyBuilder.AppendLine($"WorkflowName: {context.WorkflowName}"); _logger.LogInformation("Prepared email"); diff --git a/examples/ConductorSharp.NoApi/appsettings.json b/examples/ConductorSharp.NoApi/appsettings.json index 0e2c41ba..70ae1a47 100644 --- a/examples/ConductorSharp.NoApi/appsettings.json +++ b/examples/ConductorSharp.NoApi/appsettings.json @@ -1,9 +1,9 @@ { "Conductor": { - "BaseUrl": "http://localhost:48081", + "BaseUrl": "http://localhost:8127", "LongPollInterval": 100, "MaxConcurrentWorkers": 10, - "SleepInterval": 500, + "SleepInterval": 2000, "KafkaCancellationNotifier": { "BootstrapServers": "localhost:29092", "GroupId": "ConductorSharp.NoApi", diff --git a/src/ConductorSharp.Engine/Behaviors/ContextLoggingBehavior.cs b/src/ConductorSharp.Engine/Behaviors/ContextLoggingBehavior.cs deleted file mode 100644 index fde9a5d6..00000000 --- a/src/ConductorSharp.Engine/Behaviors/ContextLoggingBehavior.cs +++ /dev/null @@ -1,30 +0,0 @@ -using ConductorSharp.Engine.Util; -using MediatR; -using Microsoft.Extensions.Logging; -using Serilog.Context; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace ConductorSharp.Engine.Behaviors -{ - public class ContextLoggingBehavior : IPipelineBehavior where TRequest : IRequest - { - private readonly ConductorSharpExecutionContext _executionContext; - private const string LoggerPropertyName = "ConductorContext"; - - public ContextLoggingBehavior(ConductorSharpExecutionContext executionContext) - { - _executionContext = executionContext; - } - - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - using var _ = LogContext.PushProperty(LoggerPropertyName, _executionContext, true); - return await next(); - } - } -} diff --git a/src/ConductorSharp.Engine/Behaviors/ErrorHandlingBehavior.cs b/src/ConductorSharp.Engine/Behaviors/ErrorHandlingBehavior.cs deleted file mode 100644 index 82fbcf2a..00000000 --- a/src/ConductorSharp.Engine/Behaviors/ErrorHandlingBehavior.cs +++ /dev/null @@ -1,23 +0,0 @@ -using ConductorSharp.Engine.Exceptions; -using MediatR; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace ConductorSharp.Engine.Behaviors -{ - public class ErrorHandlingBehavior : IPipelineBehavior where TRequest : IRequest - { - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - try - { - return await next(); - } - catch (Exception ex) - { - throw new BaseWorkerException(ex.Message, ex); - } - } - } -} diff --git a/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs b/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs deleted file mode 100644 index 1cd81959..00000000 --- a/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs +++ /dev/null @@ -1,80 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using ConductorSharp.Engine.Util; -using MediatR; -using Microsoft.Extensions.Logging; -using Serilog.Context; - -namespace ConductorSharp.Engine.Behaviors -{ - // TODO: Consider removing this - public class RequestResponseLoggingBehavior : IPipelineBehavior - where TRequest : IRequest - { - private readonly ILogger> _logger; - private readonly ConductorSharpExecutionContext _context; - - public RequestResponseLoggingBehavior( - ILogger> logger, - ConductorSharpExecutionContext context - ) - { - _logger = logger; - _context = context; - } - - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - var requestId = Guid.NewGuid(); - var requestName = typeof(TRequest).Name; - - var stopwatch = new Stopwatch(); - - _logger.LogInformation( - $"Submitting request {{Request}} with payload {{@{requestName}}} and with id {{RequestId}}", - requestName, - request, - requestId - ); - stopwatch.Start(); - - try - { - var response = await next(); - - stopwatch.Stop(); - - _logger.LogInformation( - $"Received response {{@Response}} for request {{Request}} with id {{RequestId}} (exec time = {{ElapsedMilliseconds}})", - response, - requestName, - requestId, - stopwatch.ElapsedMilliseconds - ); - - return response; - } - catch (OperationCanceledException) when (_context.TaskId != null) - { - // Simply rethrow and do not log in order for cancellation notifier to work - throw; - } - catch (Exception exc) - { - stopwatch.Stop(); - _logger.LogError( - $"Exception occured {{@Exception}} for request {{Request}} with id {{RequestId}} (exec time = {{ElapsedMilliseconds}})", - exc, - requestName, - requestId, - stopwatch.ElapsedMilliseconds - ); - throw; - } - } - } -} diff --git a/src/ConductorSharp.Engine/Behaviors/TaskExecutionTrackingBehavior.cs b/src/ConductorSharp.Engine/Behaviors/TaskExecutionTrackingBehavior.cs deleted file mode 100644 index 4b8f2924..00000000 --- a/src/ConductorSharp.Engine/Behaviors/TaskExecutionTrackingBehavior.cs +++ /dev/null @@ -1,62 +0,0 @@ -using ConductorSharp.Engine.Interface; -using ConductorSharp.Engine.Util; -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace ConductorSharp.Engine.Behaviors -{ - public class TaskExecutionTrackingBehavior : IPipelineBehavior where TRequest : IRequest - { - private readonly ConductorSharpExecutionContext _executionContext; - private readonly IEnumerable _taskExecutionServices; - - public TaskExecutionTrackingBehavior( - ConductorSharpExecutionContext executionContext, - IEnumerable taskExecutionServices - ) - { - _executionContext = executionContext; - _taskExecutionServices = taskExecutionServices; - } - - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - var trackedTask = new RunningTask - { - TaskId = _executionContext.TaskId, - TaskName = _executionContext.TaskName, - StartedAt = DateTimeOffset.UtcNow - }; - - foreach (var taskExecutionService in _taskExecutionServices) - { - await taskExecutionService.OnPolled(trackedTask); - } - - try - { - var response = await next(); - - foreach (var taskExecutionService in _taskExecutionServices) - { - await taskExecutionService.OnCompleted(trackedTask); - } - - return response; - } - catch (Exception) - { - foreach (var taskExecutionService in _taskExecutionServices) - { - await taskExecutionService.OnFailed(trackedTask); - } - - throw; - } - } - } -} diff --git a/src/ConductorSharp.Engine/Behaviors/ValidationBehavior.cs b/src/ConductorSharp.Engine/Behaviors/ValidationBehavior.cs deleted file mode 100644 index 2185e4e6..00000000 --- a/src/ConductorSharp.Engine/Behaviors/ValidationBehavior.cs +++ /dev/null @@ -1,21 +0,0 @@ -using ConductorSharp.Engine.Util; -using MediatR; -using System.Threading; -using System.Threading.Tasks; - -namespace ConductorSharp.Engine.Behaviors -{ - public class ValidationBehavior : IPipelineBehavior where TRequest : IRequest - { - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - ObjectValidator.Validate(request); - - var response = await next(); - - ObjectValidator.Validate(response); - - return response; - } - } -} diff --git a/src/ConductorSharp.Engine/Builders/BaseTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/BaseTaskBuilder.cs index dc300494..98f6e00a 100644 --- a/src/ConductorSharp.Engine/Builders/BaseTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/BaseTaskBuilder.cs @@ -4,13 +4,12 @@ using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util; using ConductorSharp.Engine.Util.Builders; -using MediatR; using Newtonsoft.Json.Linq; namespace ConductorSharp.Engine.Builders { - public abstract class BaseTaskBuilder : ITaskOptionsBuilder, ITaskBuilder - where A : IRequest + public abstract class BaseTaskBuilder : ITaskOptionsBuilder, ITaskBuilder + where TInput : ITaskInput { protected readonly JObject _inputParameters; protected readonly string _taskRefferenceName; diff --git a/src/ConductorSharp.Engine/Builders/DynamicTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/DynamicTaskBuilder.cs index 4616820a..5765904c 100644 --- a/src/ConductorSharp.Engine/Builders/DynamicTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/DynamicTaskBuilder.cs @@ -5,7 +5,6 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util.Builders; -using MediatR; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -19,7 +18,7 @@ public static ITaskOptionsBuilder AddTask( Expression>> input ) where TWorkflow : ITypedWorkflow - where TInput : IRequest + where TInput : ITaskInput { var taskBuilder = new DynamicTaskBuilder(reference.Body, input.Body, builder.BuildConfiguration); builder.AddTaskBuilderToSequence(taskBuilder); @@ -27,8 +26,8 @@ Expression>> input } } - public class DynamicTaskBuilder(Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) - : BaseTaskBuilder, O>(taskExpression, inputExpression, buildConfiguration) + public class DynamicTaskBuilder(Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) + : BaseTaskBuilder, TOutput>(taskExpression, inputExpression, buildConfiguration) { private const string DynamicTasknameParam = "task_to_execute"; diff --git a/src/ConductorSharp.Engine/Builders/EventTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/EventTaskBuilder.cs index cc7f3334..67871157 100644 --- a/src/ConductorSharp.Engine/Builders/EventTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/EventTaskBuilder.cs @@ -5,7 +5,6 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util.Builders; -using MediatR; namespace ConductorSharp.Engine.Builders { @@ -18,7 +17,7 @@ public static ITaskOptionsBuilder AddTask( string sink ) where TWorkflow : ITypedWorkflow - where TInput : IRequest + where TInput : ITaskInput { var taskBuilder = new EventTaskBuilder(reference.Body, input.Body, builder.BuildConfiguration, sink); builder.AddTaskBuilderToSequence(taskBuilder); diff --git a/src/ConductorSharp.Engine/Builders/JsonJqTransformTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/JsonJqTransformTaskBuilder.cs index 64bebbf6..f328e9f4 100644 --- a/src/ConductorSharp.Engine/Builders/JsonJqTransformTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/JsonJqTransformTaskBuilder.cs @@ -5,7 +5,6 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util.Builders; -using MediatR; namespace ConductorSharp.Engine.Builders { @@ -17,7 +16,7 @@ public static ITaskOptionsBuilder AddTask( Expression> input ) where TWorkflow : ITypedWorkflow - where TInput : IRequest + where TInput : ITaskInput { var taskBuilder = new JsonJqTransformTaskBuilder(refference.Body, input.Body, builder.BuildConfiguration); builder.AddTaskBuilderToSequence(taskBuilder); @@ -25,8 +24,8 @@ Expression> input } } - public class JsonJqTransformTaskBuilder : BaseTaskBuilder - where A : IRequest + public class JsonJqTransformTaskBuilder : BaseTaskBuilder + where TInput : ITaskInput { public JsonJqTransformTaskBuilder(Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) : base(taskExpression, inputExpression, buildConfiguration) diff --git a/src/ConductorSharp.Engine/Builders/LambdaTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/LambdaTaskBuilder.cs index f38b0915..0f3b9f8a 100644 --- a/src/ConductorSharp.Engine/Builders/LambdaTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/LambdaTaskBuilder.cs @@ -5,7 +5,6 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util.Builders; -using MediatR; using Newtonsoft.Json.Linq; namespace ConductorSharp.Engine.Builders @@ -19,7 +18,7 @@ public static ITaskOptionsBuilder AddTask( string script ) where TWorkflow : ITypedWorkflow - where TInput : IRequest + where TInput : ITaskInput { var taskBuilder = new LambdaTaskBuilder(script, referrence.Body, input.Body, builder.BuildConfiguration); @@ -28,9 +27,13 @@ string script } } - public class LambdaTaskBuilder(string script, Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) - : BaseTaskBuilder(taskExpression, inputExpression, buildConfiguration) - where A : IRequest + public class LambdaTaskBuilder( + string script, + Expression taskExpression, + Expression inputExpression, + BuildConfiguration buildConfiguration + ) : BaseTaskBuilder(taskExpression, inputExpression, buildConfiguration) + where TInput : ITaskInput { private readonly string _script = script; diff --git a/src/ConductorSharp.Engine/Builders/SimpleTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/SimpleTaskBuilder.cs index de685ed4..7f6533fa 100644 --- a/src/ConductorSharp.Engine/Builders/SimpleTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/SimpleTaskBuilder.cs @@ -5,7 +5,6 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util.Builders; -using MediatR; namespace ConductorSharp.Engine.Builders { @@ -17,7 +16,7 @@ public static ITaskOptionsBuilder AddTask( Expression> input ) where TWorkflow : ITypedWorkflow - where Tinput : IRequest + where Tinput : ITaskInput { var taskBuilder = new SimpleTaskBuilder(refference.Body, input.Body, builder.BuildConfiguration); builder.AddTaskBuilderToSequence(taskBuilder); @@ -25,9 +24,9 @@ Expression> input } } - public class SimpleTaskBuilder(Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) - : BaseTaskBuilder(taskExpression, inputExpression, buildConfiguration) - where A : IRequest + public class SimpleTaskBuilder(Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) + : BaseTaskBuilder(taskExpression, inputExpression, buildConfiguration) + where TInput : ITaskInput { public override WorkflowTask[] Build() => [ diff --git a/src/ConductorSharp.Engine/Builders/SubWorkflowTaskBuilder.cs b/src/ConductorSharp.Engine/Builders/SubWorkflowTaskBuilder.cs index 2fded7eb..6852e7e7 100644 --- a/src/ConductorSharp.Engine/Builders/SubWorkflowTaskBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/SubWorkflowTaskBuilder.cs @@ -8,7 +8,6 @@ using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util; using ConductorSharp.Engine.Util.Builders; -using MediatR; namespace ConductorSharp.Engine.Builders { @@ -20,7 +19,7 @@ public static ITaskOptionsBuilder AddTask( Expression> input ) where TWorkflow : ITypedWorkflow - where TInput : IRequest + where TInput : ITaskInput { var taskBuilder = new SubWorkflowTaskBuilder(referrence.Body, input.Body, builder.BuildConfiguration); builder.AddTaskBuilderToSequence(taskBuilder); @@ -30,7 +29,7 @@ Expression> input public class SubWorkflowTaskBuilder(Expression taskExpression, Expression inputExpression, BuildConfiguration buildConfiguration) : BaseTaskBuilder(taskExpression, inputExpression, buildConfiguration) - where TInput : IRequest + where TInput : ITaskInput { private readonly int _version = GetVersion(taskExpression); diff --git a/src/ConductorSharp.Engine/Builders/TaskDefinitionBuilder.cs b/src/ConductorSharp.Engine/Builders/TaskDefinitionBuilder.cs index ee1a6965..b657246e 100644 --- a/src/ConductorSharp.Engine/Builders/TaskDefinitionBuilder.cs +++ b/src/ConductorSharp.Engine/Builders/TaskDefinitionBuilder.cs @@ -22,15 +22,7 @@ public TaskDef Build(Type taskType, Action updateOptions updateOptions?.Invoke(options); - var interfaces = taskType - .GetInterfaces() - .Where(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ITaskRequestHandler<,>)) - .First(); - - var genericArguments = interfaces.GetGenericArguments(); - - var inputType = genericArguments[0]; - var outputType = genericArguments[1]; + var (inputType, outputType) = WorkerUtil.GetRequestResponseTypes(taskType); var originalName = _taskNameBuilder.Build(taskType); diff --git a/src/ConductorSharp.Engine/Builders/Workflow.cs b/src/ConductorSharp.Engine/Builders/Workflow.cs index 1be1cdd0..b2e50b59 100644 --- a/src/ConductorSharp.Engine/Builders/Workflow.cs +++ b/src/ConductorSharp.Engine/Builders/Workflow.cs @@ -1,11 +1,10 @@ using ConductorSharp.Client.Generated; using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; -using MediatR; namespace ConductorSharp.Engine.Builders { - public class WorkflowInput : IWorkflowInput, IRequest + public class WorkflowInput : IWorkflowInput, ITaskInput where T : WorkflowOutput { } public class WorkflowOutput { } diff --git a/src/ConductorSharp.Engine/ConductorSharp.Engine.csproj b/src/ConductorSharp.Engine/ConductorSharp.Engine.csproj index ef306da3..0c6dbf15 100644 --- a/src/ConductorSharp.Engine/ConductorSharp.Engine.csproj +++ b/src/ConductorSharp.Engine/ConductorSharp.Engine.csproj @@ -16,10 +16,9 @@ - - - + + @@ -27,5 +26,8 @@ - + + + + diff --git a/src/ConductorSharp.Engine/Exceptions/BaseWorkerException.cs b/src/ConductorSharp.Engine/Exceptions/BaseWorkerException.cs deleted file mode 100644 index 680753c2..00000000 --- a/src/ConductorSharp.Engine/Exceptions/BaseWorkerException.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace ConductorSharp.Engine.Exceptions -{ - public class BaseWorkerException : Exception - { - public BaseWorkerException(string message, Exception innerException) : base(message, innerException) { } - } -} diff --git a/src/ConductorSharp.Engine/ExecutionManager.cs b/src/ConductorSharp.Engine/ExecutionManager.cs index 259b4aba..361f3f46 100644 --- a/src/ConductorSharp.Engine/ExecutionManager.cs +++ b/src/ConductorSharp.Engine/ExecutionManager.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Threading; -using System.Threading.Tasks; using ConductorSharp.Client; using ConductorSharp.Client.Generated; using ConductorSharp.Client.Service; @@ -11,8 +11,8 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Polling; +using ConductorSharp.Engine.Service; using ConductorSharp.Engine.Util; -using MediatR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -32,6 +32,7 @@ internal class ExecutionManager : IExecutionManager private readonly IPollTimingStrategy _pollTimingStrategy; private readonly IPollOrderStrategy _pollOrderStrategy; private readonly ICancellationNotifier _cancellationNotifier; + private readonly WorkerInvokerService _workerInvokerService; public ExecutionManager( WorkerSetConfig options, @@ -42,7 +43,8 @@ public ExecutionManager( IServiceScopeFactory lifetimeScope, IPollTimingStrategy pollTimingStrategy, IPollOrderStrategy pollOrderStrategy, - ICancellationNotifier cancellationNotifier + ICancellationNotifier cancellationNotifier, + WorkerInvokerService workerInvokerService ) { _configuration = options; @@ -54,6 +56,7 @@ ICancellationNotifier cancellationNotifier _pollTimingStrategy = pollTimingStrategy; _pollOrderStrategy = pollOrderStrategy; _cancellationNotifier = cancellationNotifier; + _workerInvokerService = workerInvokerService; _externalPayloadService = externalPayloadService; } @@ -98,20 +101,6 @@ private string GetQueueTaskName(TaskToWorker taskToWorker) return taskToWorker.TaskName; } - private static Type GetInputType(Type workerType) - { - var interfaces = workerType - .GetInterfaces() - .Where(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ITaskRequestHandler<,>)) - .First(); - var genericArguments = interfaces.GetGenericArguments(); - - var inputType = genericArguments[0]; - var outputType = genericArguments[1]; - - return inputType; - } - private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken cancellationToken) { Client.Generated.Task pollResponse; @@ -177,36 +166,47 @@ CancellationToken cancellationToken ); } - var inputType = GetInputType(scheduledWorker.TaskType); - var inputData = SerializationHelper.DictonaryToObject(inputType, pollResponse.InputData, ConductorConstants.IoJsonSerializerSettings); - // Poll response data can be huge (if read from external storage) - // We can save memory by not holding reference to pollResponse.InputData after it is parsed - pollResponse.InputData = null; - - using var scope = _lifetimeScopeFactory.CreateScope(); - - var context = scope.ServiceProvider.GetService(); - var mediator = scope.ServiceProvider.GetRequiredService(); - - if (context != null) - { - context.WorkflowName = pollResponse.WorkflowType; - context.TaskName = pollResponse.TaskDefName; - context.TaskReferenceName = pollResponse.ReferenceTaskName; - context.WorkflowId = pollResponse.WorkflowInstanceId; - context.CorrelationId = pollResponse.CorrelationId; - context.TaskId = pollResponse.TaskId; - context.WorkerId = workerId; - } + var context = new WorkerExecutionContext( + WorkflowName: pollResponse.WorkflowType, + WorkflowId: pollResponse.WorkflowInstanceId, + TaskName: pollResponse.TaskDefName, + TaskId: pollResponse.TaskId, + TaskReferenceName: pollResponse.ReferenceTaskName, + CorrelationId: pollResponse.CorrelationId, + WorkerId: workerId + ); - var response = await mediator.Send(inputData, tokenHolder.CancellationToken); + _logger.LogInformation( + "Executing worker {Worker} for task {Task}(id={TaskId}) as part of workflow {Workflow}(id={WorkflowId})", + scheduledWorker.TaskType.Name, + pollResponse.TaskDefName, + pollResponse.TaskId, + pollResponse.WorkflowType, + pollResponse.WorkflowInstanceId + ); + var stopwatch = Stopwatch.StartNew(); + var response = await _workerInvokerService.Invoke( + scheduledWorker.TaskType, + pollResponse.InputData, + context, + tokenHolder.CancellationToken + ); + _logger.LogInformation( + "Worker {Worker} executed for task {Task}(id={TaskId}) as part of workflow {Workflow}(id={WorkflowId}), exec time = {WorkerPipelineExecutionTime}ms", + scheduledWorker.TaskType.Name, + pollResponse.TaskDefName, + pollResponse.TaskId, + pollResponse.WorkflowType, + pollResponse.WorkflowInstanceId, + stopwatch.ElapsedMilliseconds + ); await _taskManager.UpdateAsync( new TaskResult { TaskId = pollResponse.TaskId, Status = TaskResultStatus.COMPLETED, - OutputData = SerializationHelper.ObjectToDictionary(response, ConductorConstants.IoJsonSerializerSettings), + OutputData = response, WorkflowInstanceId = pollResponse.WorkflowInstanceId }, tokenHolder.CancellationToken @@ -235,9 +235,10 @@ await _taskManager.UpdateAsync( catch (Exception exception) { _logger.LogError( - "{@Exception} while executing {Task} as part of {Workflow} with id {WorkflowId}", exception, + "Exception while processing polled task {Task}(id={TaskId}) as part of workflow {Workflow}(id={WorkflowId})", pollResponse.TaskDefName, + pollResponse.TaskId, pollResponse.WorkflowType, pollResponse.WorkflowInstanceId ); diff --git a/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs b/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs index 5c59e1e7..4a249234 100644 --- a/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs +++ b/src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs @@ -2,14 +2,12 @@ using System.Net.Http; using System.Reflection; using ConductorSharp.Client.Service; -using ConductorSharp.Engine.Behaviors; using ConductorSharp.Engine.Health; using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Polling; using ConductorSharp.Engine.Service; using ConductorSharp.Engine.Util; using ConductorSharp.Engine.Util.Builders; -using MediatR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -19,13 +17,7 @@ public class ConductorSharpBuilder(IServiceCollection builder) : IConductorSharp { public IServiceCollection Builder { get; set; } = builder; - public IExecutionManagerBuilder AddExecutionManager( - int maxConcurrentWorkers, - int sleepInterval, - int longPollInterval, - string domain = null, - params Assembly[] handlerAssemblies - ) + public IExecutionManagerBuilder AddExecutionManager(int maxConcurrentWorkers, int sleepInterval, int longPollInterval, string domain = null) { var workerConfig = new WorkerSetConfig { @@ -43,9 +35,7 @@ params Assembly[] handlerAssemblies Builder.AddTransient(); - Builder.AddSingleton(); - - Builder.AddScoped(); + Builder.AddSingleton(); Builder.AddSingleton(); @@ -55,21 +45,21 @@ params Assembly[] handlerAssemblies Builder.AddSingleton(); - Builder.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(handlerAssemblies)); + Builder.AddTransient(); return this; } public IExecutionManagerBuilder UseBetaExecutionManager() { - Builder.AddSingleton(); + Builder.AddSingleton(); return this; } - - public IExecutionManagerBuilder AddPipelines(Action behaviorBuilder) + + public IExecutionManagerBuilder AddPipelines(Action middlewareBuilder) { var pipelineBuilder = new PipelineBuilder(Builder); - behaviorBuilder(pipelineBuilder); + middlewareBuilder(pipelineBuilder); return this; } diff --git a/src/ConductorSharp.Engine/Extensions/IConductorSharpBuilder.cs b/src/ConductorSharp.Engine/Extensions/IConductorSharpBuilder.cs index c381b887..c114b4c9 100644 --- a/src/ConductorSharp.Engine/Extensions/IConductorSharpBuilder.cs +++ b/src/ConductorSharp.Engine/Extensions/IConductorSharpBuilder.cs @@ -8,13 +8,7 @@ namespace ConductorSharp.Engine.Extensions { public interface IConductorSharpBuilder { - IExecutionManagerBuilder AddExecutionManager( - int maxConcurrentWorkers, - int sleepInterval, - int longPollInterval, - string domain = null, - params Assembly[] handlerAssemblies - ); + IExecutionManagerBuilder AddExecutionManager(int maxConcurrentWorkers, int sleepInterval, int longPollInterval, string domain = null); IConductorSharpBuilder SetBuildConfiguration(BuildConfiguration buildConfiguration); IConductorSharpBuilder AddAlternateClient(string baseUrl, string key, string apiPath = "api", bool ignoreInvalidCertificate = false); } diff --git a/src/ConductorSharp.Engine/Extensions/IPipelineBuilder.cs b/src/ConductorSharp.Engine/Extensions/IPipelineBuilder.cs index 70bfadc7..85bb486d 100644 --- a/src/ConductorSharp.Engine/Extensions/IPipelineBuilder.cs +++ b/src/ConductorSharp.Engine/Extensions/IPipelineBuilder.cs @@ -1,16 +1,16 @@ using System; -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Extensions { public interface IPipelineBuilder { - void AddRequestResponseLogging(); void AddValidation(); - void AddContextLogging(); void AddExecutionTaskTracking(); - void AddCustomBehavior(Type behaviorType); - void AddCustomBehavior() - where TBehavior : class, IPipelineBehavior; + void AddCustomMiddleware(Type middlewareType); + + void AddCustomMiddleware() + where TWorkerMiddleware : class, IWorkerMiddleware + where TRequest : ITaskInput, new(); } } diff --git a/src/ConductorSharp.Engine/Extensions/PipelineBuilder.cs b/src/ConductorSharp.Engine/Extensions/PipelineBuilder.cs index 9c8f6072..93e6a0f1 100644 --- a/src/ConductorSharp.Engine/Extensions/PipelineBuilder.cs +++ b/src/ConductorSharp.Engine/Extensions/PipelineBuilder.cs @@ -1,27 +1,26 @@ using System; -using System.Reflection; -using ConductorSharp.Engine.Behaviors; -using MediatR; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Middlewares; using Microsoft.Extensions.DependencyInjection; namespace ConductorSharp.Engine.Extensions; internal class PipelineBuilder(IServiceCollection serviceCollection) : IPipelineBuilder { - public void AddRequestResponseLogging() => - serviceCollection.AddTransient(typeof(IPipelineBehavior<,>), typeof(RequestResponseLoggingBehavior<,>)); + public void AddValidation() => serviceCollection.AddTransient(typeof(IWorkerMiddleware<,>), typeof(ValidationWorkerMiddleware<,>)); - public void AddValidation() => serviceCollection.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>)); + public void AddExecutionTaskTracking() => + serviceCollection.AddTransient(typeof(IWorkerMiddleware<,>), typeof(TaskExecutionTrackingMiddleware<,>)); - public void AddContextLogging() => serviceCollection.AddTransient(typeof(IPipelineBehavior<,>), typeof(ContextLoggingBehavior<,>)); - - public void AddExecutionTaskTracking() => serviceCollection.AddTransient(typeof(IPipelineBehavior<,>), typeof(TaskExecutionTrackingBehavior<,>)); - - public void AddCustomBehavior(Type behaviorType) => serviceCollection.AddTransient(typeof(IPipelineBehavior<,>), behaviorType); - - public void AddCustomBehavior() - where TBehavior : class, IPipelineBehavior + public void AddCustomMiddleware(Type middlewareType) { - serviceCollection.AddTransient, TBehavior>(); + if (middlewareType.GetInterface(typeof(IWorkerMiddleware<,>).Name) is null) + throw new ArgumentException($"Generic middleware must implement interface {typeof(IWorkerMiddleware<,>).Name}", nameof(middlewareType)); + + serviceCollection.AddTransient(typeof(IWorkerMiddleware<,>), middlewareType); } + + public void AddCustomMiddleware() + where TWorkerMiddleware : class, IWorkerMiddleware + where TRequest : ITaskInput, new() => serviceCollection.AddTransient, TWorkerMiddleware>(); } diff --git a/src/ConductorSharp.Engine/Extensions/TaskRegistrationExtensions.cs b/src/ConductorSharp.Engine/Extensions/TaskRegistrationExtensions.cs index a9833345..1e5b22ed 100644 --- a/src/ConductorSharp.Engine/Extensions/TaskRegistrationExtensions.cs +++ b/src/ConductorSharp.Engine/Extensions/TaskRegistrationExtensions.cs @@ -2,7 +2,6 @@ using System.Reflection; using ConductorSharp.Engine.Builders; using ConductorSharp.Engine.Builders.Metadata; -using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using Microsoft.Extensions.DependencyInjection; @@ -11,7 +10,9 @@ namespace ConductorSharp.Engine.Extensions public static class TaskRegistrationExtensions { public static void RegisterWorkerTask(this IServiceCollection builder, Action updateOptions = null) - where TWorkerTask : IWorker + where TWorkerTask : class + // TODO: MR Removal + //where TWorkerTask : IWorker { builder.AddSingleton(ctx => ctx.GetRequiredService().Build(updateOptions)); @@ -24,6 +25,8 @@ public static void RegisterWorkerTask(this IServiceCollection build TaskDomain = GetTaskDomain(typeof(TWorkerTask)) } ); + + builder.AddTransient(); } private static string GetTaskDomain(Type workerType) => workerType.GetCustomAttribute()?.Domain; diff --git a/src/ConductorSharp.Engine/Interface/IWorkerTaskLogger.cs b/src/ConductorSharp.Engine/Interface/ITaskInput.cs similarity index 52% rename from src/ConductorSharp.Engine/Interface/IWorkerTaskLogger.cs rename to src/ConductorSharp.Engine/Interface/ITaskInput.cs index a101ca27..a66e9622 100644 --- a/src/ConductorSharp.Engine/Interface/IWorkerTaskLogger.cs +++ b/src/ConductorSharp.Engine/Interface/ITaskInput.cs @@ -1 +1,3 @@ namespace ConductorSharp.Engine.Interface; + +public interface ITaskInput; diff --git a/src/ConductorSharp.Engine/Interface/ITaskRequestHandler.cs b/src/ConductorSharp.Engine/Interface/ITaskRequestHandler.cs deleted file mode 100644 index 15e0516a..00000000 --- a/src/ConductorSharp.Engine/Interface/ITaskRequestHandler.cs +++ /dev/null @@ -1,6 +0,0 @@ -using MediatR; - -namespace ConductorSharp.Engine.Interface -{ - public interface ITaskRequestHandler : IWorker, IRequestHandler where TRequest : IRequest { } -} diff --git a/src/ConductorSharp.Engine/Interface/IWorker.cs b/src/ConductorSharp.Engine/Interface/IWorker.cs index d1ec7707..c17c1f27 100644 --- a/src/ConductorSharp.Engine/Interface/IWorker.cs +++ b/src/ConductorSharp.Engine/Interface/IWorker.cs @@ -1,4 +1,11 @@ -namespace ConductorSharp.Engine.Interface +using System.Threading; +using System.Threading.Tasks; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.Engine.Interface; + +public interface IWorker + where TRequest : ITaskInput, new() { - public interface IWorker { } + Task Handle(TRequest test, WorkerExecutionContext context, CancellationToken cancellationToken); } diff --git a/src/ConductorSharp.Engine/Interface/IWorkerMiddleware.cs b/src/ConductorSharp.Engine/Interface/IWorkerMiddleware.cs new file mode 100644 index 00000000..d6ae7e58 --- /dev/null +++ b/src/ConductorSharp.Engine/Interface/IWorkerMiddleware.cs @@ -0,0 +1,12 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.Engine.Interface; + +public interface IWorkerMiddleware + where TRequest : ITaskInput, new() +{ + Task Handle(TRequest request, WorkerExecutionContext context, Func> next, CancellationToken cancellationToken); +} diff --git a/src/ConductorSharp.Engine/Middlewares/TaskExecutionTrackingMiddleware.cs b/src/ConductorSharp.Engine/Middlewares/TaskExecutionTrackingMiddleware.cs new file mode 100644 index 00000000..d6b42614 --- /dev/null +++ b/src/ConductorSharp.Engine/Middlewares/TaskExecutionTrackingMiddleware.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.Engine.Middlewares; + +internal class TaskExecutionTrackingMiddleware : IWorkerMiddleware + where TRequest : ITaskInput, new() +{ + private readonly IEnumerable _taskExecutionServices; + + public TaskExecutionTrackingMiddleware(IEnumerable taskExecutionServices) + { + _taskExecutionServices = taskExecutionServices; + } + + public async Task Handle( + TRequest request, + WorkerExecutionContext context, + Func> next, + CancellationToken cancellationToken + ) + { + var trackedTask = new RunningTask + { + TaskId = context.TaskId, + TaskName = context.TaskName, + StartedAt = DateTimeOffset.UtcNow + }; + + foreach (var taskExecutionService in _taskExecutionServices) + { + await taskExecutionService.OnPolled(trackedTask); + } + + try + { + var response = await next(); + + foreach (var taskExecutionService in _taskExecutionServices) + { + await taskExecutionService.OnCompleted(trackedTask); + } + + return response; + } + catch (Exception) + { + foreach (var taskExecutionService in _taskExecutionServices) + { + await taskExecutionService.OnFailed(trackedTask); + } + + throw; + } + } +} diff --git a/src/ConductorSharp.Engine/Middlewares/ValidationWorkerMiddleware.cs b/src/ConductorSharp.Engine/Middlewares/ValidationWorkerMiddleware.cs new file mode 100644 index 00000000..94759fd0 --- /dev/null +++ b/src/ConductorSharp.Engine/Middlewares/ValidationWorkerMiddleware.cs @@ -0,0 +1,23 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.Engine.Middlewares; + +internal class ValidationWorkerMiddleware : IWorkerMiddleware + where TRequest : ITaskInput, new() +{ + public async Task Handle( + TRequest request, + WorkerExecutionContext context, + Func> next, + CancellationToken cancellationToken + ) + { + ObjectValidator.Validate(request); + var response = await next(); + return response; + } +} diff --git a/src/ConductorSharp.Engine/Model/DecisionTaskModel.cs b/src/ConductorSharp.Engine/Model/DecisionTaskModel.cs index 49fe4f8f..d06e19e5 100644 --- a/src/ConductorSharp.Engine/Model/DecisionTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/DecisionTaskModel.cs @@ -1,11 +1,11 @@ -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { - public class DecisionTaskInput : IRequest + public class DecisionTaskInput : ITaskInput { public object CaseValueParam { get; set; } } - public class DecisionTaskModel : TaskModel { } + public class DecisionTaskModel : TaskModel; } diff --git a/src/ConductorSharp.Engine/Model/DoWhileTaskModel.cs b/src/ConductorSharp.Engine/Model/DoWhileTaskModel.cs index f635ae2b..b6c559e2 100644 --- a/src/ConductorSharp.Engine/Model/DoWhileTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/DoWhileTaskModel.cs @@ -1,12 +1,12 @@ using ConductorSharp.Engine.Builders; -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { /// /// Input for configuration of the DO_WHILE task. /// - public class DoWhileInput : IRequest, IWorkflowInput + public class DoWhileInput : ITaskInput, IWorkflowInput { public object Value { get; set; } = null; } @@ -14,5 +14,5 @@ public class DoWhileInput : IRequest, IWorkflowInput /// /// Task Model to reference the do while task in the workflow builders /// - public class DoWhileTaskModel : TaskModel { } + public class DoWhileTaskModel : TaskModel; } diff --git a/src/ConductorSharp.Engine/Model/DynamicForkJoinTaskModel.cs b/src/ConductorSharp.Engine/Model/DynamicForkJoinTaskModel.cs index a8e14a76..87474900 100644 --- a/src/ConductorSharp.Engine/Model/DynamicForkJoinTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/DynamicForkJoinTaskModel.cs @@ -1,13 +1,13 @@ -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { - public class DynamicForkJoinInput : IRequest + public class DynamicForkJoinInput : ITaskInput { public object DynamicTasks { get; set; } public object DynamicTasksI { get; set; } } - public class DynamicForkJoinTaskModel : TaskModel { } + public class DynamicForkJoinTaskModel : TaskModel; } diff --git a/src/ConductorSharp.Engine/Model/DynamicTaskModel.cs b/src/ConductorSharp.Engine/Model/DynamicTaskModel.cs index ba50a2e1..bf740f90 100644 --- a/src/ConductorSharp.Engine/Model/DynamicTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/DynamicTaskModel.cs @@ -1,15 +1,12 @@ -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { - public class DynamicTaskInput : IRequest + public class DynamicTaskInput : ITaskInput { - public I TaskInput { get; set; } + public TInput TaskInput { get; set; } public string TaskToExecute { get; set; } } - public class DynamicTaskModel : TaskModel, O> { } + public class DynamicTaskModel : TaskModel, TOutput>; } diff --git a/src/ConductorSharp.Engine/Model/EventTaskModel.cs b/src/ConductorSharp.Engine/Model/EventTaskModel.cs index ee25faab..63d64a7c 100644 --- a/src/ConductorSharp.Engine/Model/EventTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/EventTaskModel.cs @@ -1,4 +1,4 @@ -using MediatR; +using ConductorSharp.Engine.Interface; using Newtonsoft.Json; namespace ConductorSharp.Engine.Model; @@ -28,4 +28,4 @@ public class EventTaskModelOutput } public class EventTaskModel : TaskModel - where TInput : IRequest { } + where TInput : ITaskInput; diff --git a/src/ConductorSharp.Engine/Model/HumanTaskModel.cs b/src/ConductorSharp.Engine/Model/HumanTaskModel.cs index 1e6dbeb6..eb50f196 100644 --- a/src/ConductorSharp.Engine/Model/HumanTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/HumanTaskModel.cs @@ -1,9 +1,8 @@ -using MediatR; -using Newtonsoft.Json; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { - public class HumanTaskInput : IRequest { } + public class HumanTaskInput : ITaskInput; - public class HumanTaskModel : TaskModel, TOutput> { } + public class HumanTaskModel : TaskModel, TOutput>; } diff --git a/src/ConductorSharp.Engine/Model/JsonJqTransformTaskModel.cs b/src/ConductorSharp.Engine/Model/JsonJqTransformTaskModel.cs index c3ba9feb..ecd1a33f 100644 --- a/src/ConductorSharp.Engine/Model/JsonJqTransformTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/JsonJqTransformTaskModel.cs @@ -1,6 +1,7 @@ -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { - public class JsonJqTransformTaskModel : TaskModel where I : IRequest { } + public class JsonJqTransformTaskModel : TaskModel + where TInput : ITaskInput; } diff --git a/src/ConductorSharp.Engine/Model/LambdaTaskModel.cs b/src/ConductorSharp.Engine/Model/LambdaTaskModel.cs index 61016e3e..7ab9c50e 100644 --- a/src/ConductorSharp.Engine/Model/LambdaTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/LambdaTaskModel.cs @@ -1,4 +1,4 @@ -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { @@ -7,9 +7,10 @@ public abstract class LambdaOutputModel public O Result { get; set; } } - public abstract class LambdaTaskModel where I : IRequest + public abstract class LambdaTaskModel + where TInput : ITaskInput { - public I Input { get; set; } - public LambdaOutputModel Output { get; set; } + public TInput Input { get; set; } + public LambdaOutputModel Output { get; set; } } } diff --git a/src/ConductorSharp.Engine/Model/SimpleTaskModel.cs b/src/ConductorSharp.Engine/Model/SimpleTaskModel.cs index 3bf5192c..27f8d410 100644 --- a/src/ConductorSharp.Engine/Model/SimpleTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/SimpleTaskModel.cs @@ -1,7 +1,7 @@ using ConductorSharp.Engine.Interface; -using MediatR; namespace ConductorSharp.Engine.Model { - public abstract class SimpleTaskModel : TaskModel, INameable where I : IRequest { } + public abstract class SimpleTaskModel : TaskModel, INameable + where TInput : ITaskInput; } diff --git a/src/ConductorSharp.Engine/Model/SubWorkflowTaskModel.cs b/src/ConductorSharp.Engine/Model/SubWorkflowTaskModel.cs index 122a1942..2eaa0c6a 100644 --- a/src/ConductorSharp.Engine/Model/SubWorkflowTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/SubWorkflowTaskModel.cs @@ -1,7 +1,7 @@ using ConductorSharp.Engine.Interface; -using MediatR; namespace ConductorSharp.Engine.Model { - public abstract class SubWorkflowTaskModel : TaskModel, INameable where I : IRequest { } + public abstract class SubWorkflowTaskModel : TaskModel, INameable + where TInput : ITaskInput; } diff --git a/src/ConductorSharp.Engine/Model/SwitchTaskModel.cs b/src/ConductorSharp.Engine/Model/SwitchTaskModel.cs index e0c391c2..a923b010 100644 --- a/src/ConductorSharp.Engine/Model/SwitchTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/SwitchTaskModel.cs @@ -1,11 +1,11 @@ -using MediatR; +using ConductorSharp.Engine.Interface; namespace ConductorSharp.Engine.Model { - public class SwitchTaskInput : IRequest + public class SwitchTaskInput : ITaskInput { public object SwitchCaseValue { get; set; } } - public class SwitchTaskModel : TaskModel { } + public class SwitchTaskModel : TaskModel; } diff --git a/src/ConductorSharp.Engine/Model/TaskModel.cs b/src/ConductorSharp.Engine/Model/TaskModel.cs index 1fd31266..e15b5301 100644 --- a/src/ConductorSharp.Engine/Model/TaskModel.cs +++ b/src/ConductorSharp.Engine/Model/TaskModel.cs @@ -1,10 +1,9 @@ using ConductorSharp.Engine.Interface; -using MediatR; using Newtonsoft.Json; namespace ConductorSharp.Engine.Model { - public abstract class TaskModel : ITaskModel where I : IRequest + public abstract class TaskModel : ITaskModel { public I Input { get; } diff --git a/src/ConductorSharp.Engine/Model/TerminateTaskModel.cs b/src/ConductorSharp.Engine/Model/TerminateTaskModel.cs index 18ce1fb5..0b31e3fa 100644 --- a/src/ConductorSharp.Engine/Model/TerminateTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/TerminateTaskModel.cs @@ -1,5 +1,5 @@ -using ConductorSharp.Engine.Util; -using MediatR; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; using Newtonsoft.Json; namespace ConductorSharp.Engine.Model @@ -13,7 +13,7 @@ public enum TerminationStatus Failed }; - public class TerminateTaskInput : IRequest + public class TerminateTaskInput : ITaskInput { [JsonProperty("workflowOutput")] public object WorkflowOutput { get; set; } @@ -22,5 +22,5 @@ public class TerminateTaskInput : IRequest public TerminationStatus TerminationStatus { get; set; } } - public class TerminateTaskModel : TaskModel { } + public class TerminateTaskModel : TaskModel; } diff --git a/src/ConductorSharp.Engine/Model/WaitTaskModel.cs b/src/ConductorSharp.Engine/Model/WaitTaskModel.cs index 37b66355..0919f74e 100644 --- a/src/ConductorSharp.Engine/Model/WaitTaskModel.cs +++ b/src/ConductorSharp.Engine/Model/WaitTaskModel.cs @@ -1,12 +1,9 @@ -using MediatR; +using ConductorSharp.Engine.Interface; using Newtonsoft.Json; -using System; -using System.Collections.Generic; -using System.Text; namespace ConductorSharp.Engine.Model { - public class WaitTaskInput : IRequest + public class WaitTaskInput : ITaskInput { [JsonProperty("duration")] public string Duration { get; set; } @@ -15,5 +12,5 @@ public class WaitTaskInput : IRequest public string Until { get; set; } } - public class WaitTaskModel : TaskModel { } + public class WaitTaskModel : TaskModel; } diff --git a/src/ConductorSharp.Engine/Service/WorkerInvokerService.cs b/src/ConductorSharp.Engine/Service/WorkerInvokerService.cs new file mode 100644 index 00000000..1602fc1c --- /dev/null +++ b/src/ConductorSharp.Engine/Service/WorkerInvokerService.cs @@ -0,0 +1,153 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Linq.Expressions; +using System.Reflection; +using System.Reflection.Metadata; +using System.Runtime.InteropServices.ComTypes; +using System.Threading; +using System.Threading.Tasks; +using ConductorSharp.Client; +using ConductorSharp.Client.Util; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Util; +using Microsoft.Extensions.DependencyInjection; + +namespace ConductorSharp.Engine.Service +{ + internal class WorkerInvokerService(IServiceProvider serviceProvider) + { + private record WorkerTypeInfo + { + public WorkerTypeInfo(Type workerType) + { + WorkerType = workerType; + (RequestType, ResponseType) = WorkerUtil.GetRequestResponseTypes(workerType); + MiddlewareType = typeof(IWorkerMiddleware<,>).MakeGenericType(RequestType, ResponseType); + WorkerHandleMethod = typeof(IWorker<,>) + .MakeGenericType(RequestType, ResponseType) + .GetMethod(nameof(IWorker.Handle)); + MiddlewareHandleMethod = MiddlewareType.GetMethod(nameof(IWorkerMiddleware.Handle)); + NextFuncType = MiddlewareHandleMethod!.GetParameters().FirstOrDefault(p => p.Name == "next")!.ParameterType; + TaskResultProperty = typeof(Task<>).MakeGenericType(ResponseType).GetProperty(nameof(Task.Result)); + } + + public Type WorkerType { get; } + public Type RequestType { get; } + public Type ResponseType { get; } + public Type MiddlewareType { get; } + public MethodInfo WorkerHandleMethod { get; } + public MethodInfo MiddlewareHandleMethod { get; } + public Type NextFuncType { get; } + public PropertyInfo TaskResultProperty { get; } + } + + private readonly IServiceProvider _serviceProvider = serviceProvider; + + public async Task> Invoke( + Type workerType, + IDictionary request, + WorkerExecutionContext workerExecutionContext, + CancellationToken cancellationToken + ) + { + var workerTypeInfo = new WorkerTypeInfo(workerType); + var objRequest = SerializationHelper.DictonaryToObject(workerTypeInfo.RequestType, request, ConductorConstants.IoJsonSerializerSettings); + var objResponse = await InternalInvoke(workerTypeInfo, objRequest, workerExecutionContext, cancellationToken); + var response = SerializationHelper.ObjectToDictionary(objResponse, ConductorConstants.IoJsonSerializerSettings); + + return response; + } + + private async Task InternalInvoke( + WorkerTypeInfo workerTypeInfo, + object request, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) + { + var middlewares = _serviceProvider.GetServices(workerTypeInfo.MiddlewareType).ToArray(); + var worker = _serviceProvider.GetRequiredService(workerTypeInfo.WorkerType); + var resultTask = InvokeMiddlewarePipeline(middlewares, worker, workerTypeInfo, request, context, cancellationToken); + await resultTask; + var result = workerTypeInfo.TaskResultProperty.GetValue(resultTask); + return result; + } + + private static Task InvokeMiddlewarePipeline( + object[] middlewares, + object worker, + WorkerTypeInfo workerTypeInfo, + object request, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) + { + object result; + if (middlewares.Length == 0) + result = workerTypeInfo.WorkerHandleMethod.Invoke(worker, [request, context, cancellationToken]); + else + { + var next = GenerateCallToMiddleware(middlewares, worker, request, 1, workerTypeInfo, context, cancellationToken).Compile(); + result = workerTypeInfo.MiddlewareHandleMethod.Invoke(middlewares[0], [request, context, next, cancellationToken]); + } + + return (Task)result; + } + + private static LambdaExpression GenerateCallToHandler( + object worker, + object request, + WorkerTypeInfo workerInfo, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) + { + var requestArgument = Expression.Constant(request); + var cancellationTokenArgument = Expression.Constant(cancellationToken); + var contextArgument = Expression.Constant(context); + + var lambdaBody = Expression.Call( + Expression.Constant(worker), + workerInfo.WorkerHandleMethod, + requestArgument, + contextArgument, + cancellationTokenArgument + ); + var lambda = Expression.Lambda(workerInfo.NextFuncType, lambdaBody); + return lambda; + } + + private static LambdaExpression GenerateCallToMiddleware( + object[] middlewares, + object worker, + object request, + int middlewareIndex, + WorkerTypeInfo workerInfo, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) + { + if (middlewares.Length == middlewareIndex) + return GenerateCallToHandler(worker, request, workerInfo, context, cancellationToken); + + var requestArgument = Expression.Constant(request); + var cancellationTokenArgument = Expression.Constant(cancellationToken); + var contextArgument = Expression.Constant(context); + + var nextLambda = GenerateCallToMiddleware(middlewares, worker, request, middlewareIndex + 1, workerInfo, context, cancellationToken); + var lambdaBody = Expression.Call( + Expression.Constant(middlewares[middlewareIndex]), + workerInfo.MiddlewareHandleMethod, + requestArgument, + contextArgument, + nextLambda, + cancellationTokenArgument + ); + var lambda = Expression.Lambda(workerInfo.NextFuncType, lambdaBody); + return lambda; + } + } +} diff --git a/src/ConductorSharp.Engine/TaskRequestHandler.cs b/src/ConductorSharp.Engine/TaskRequestHandler.cs deleted file mode 100644 index daf1fa95..00000000 --- a/src/ConductorSharp.Engine/TaskRequestHandler.cs +++ /dev/null @@ -1,17 +0,0 @@ -using ConductorSharp.Engine.Interface; -using ConductorSharp.Engine.Model; -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace ConductorSharp.Engine -{ - public abstract class TaskRequestHandler : SimpleTaskModel, ITaskRequestHandler - where TInput : IRequest - { - public abstract Task Handle(TInput request, CancellationToken cancellationToken); - } -} diff --git a/src/ConductorSharp.Engine/TypePollSpreadingExecutionManager.cs b/src/ConductorSharp.Engine/TypePollSpreadingExecutionManager.cs index fc12ac34..94a2b9e1 100644 --- a/src/ConductorSharp.Engine/TypePollSpreadingExecutionManager.cs +++ b/src/ConductorSharp.Engine/TypePollSpreadingExecutionManager.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Threading; -using System.Threading.Tasks; using ConductorSharp.Client; using ConductorSharp.Client.Generated; using ConductorSharp.Client.Service; @@ -11,8 +11,8 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Polling; +using ConductorSharp.Engine.Service; using ConductorSharp.Engine.Util; -using MediatR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -32,6 +32,7 @@ internal class TypePollSpreadingExecutionManager : IExecutionManager private readonly IPollTimingStrategy _pollTimingStrategy; private readonly IPollOrderStrategy _pollOrderStrategy; private readonly ICancellationNotifier _cancellationNotifier; + private readonly WorkerInvokerService _workerInvokerService; public TypePollSpreadingExecutionManager( WorkerSetConfig options, @@ -42,7 +43,8 @@ public TypePollSpreadingExecutionManager( IServiceScopeFactory lifetimeScope, IPollTimingStrategy pollTimingStrategy, IPollOrderStrategy pollOrderStrategy, - ICancellationNotifier cancellationNotifier + ICancellationNotifier cancellationNotifier, + WorkerInvokerService workerInvokerService ) { _configuration = options; @@ -54,6 +56,7 @@ ICancellationNotifier cancellationNotifier _pollTimingStrategy = pollTimingStrategy; _pollOrderStrategy = pollOrderStrategy; _cancellationNotifier = cancellationNotifier; + _workerInvokerService = workerInvokerService; _externalPayloadService = externalPayloadService; } @@ -107,20 +110,6 @@ private string GetQueueTaskName(TaskToWorker taskToWorker) return taskToWorker.TaskName; } - private static Type GetInputType(Type workerType) - { - var interfaces = workerType - .GetInterfaces() - .Where(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ITaskRequestHandler<,>)) - .First(); - var genericArguments = interfaces.GetGenericArguments(); - - var inputType = genericArguments[0]; - var outputType = genericArguments[1]; - - return inputType; - } - private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken cancellationToken) { Client.Generated.Task pollResponse; @@ -186,36 +175,47 @@ CancellationToken cancellationToken ); } - var inputType = GetInputType(scheduledWorker.TaskType); - var inputData = SerializationHelper.DictonaryToObject(inputType, pollResponse.InputData, ConductorConstants.IoJsonSerializerSettings); - // Poll response data can be huge (if read from external storage) - // We can save memory by not holding reference to pollResponse.InputData after it is parsed - pollResponse.InputData = null; - - using var scope = _lifetimeScopeFactory.CreateScope(); - - var context = scope.ServiceProvider.GetService(); - var mediator = scope.ServiceProvider.GetRequiredService(); - - if (context != null) - { - context.WorkflowName = pollResponse.WorkflowType; - context.TaskName = pollResponse.TaskDefName; - context.TaskReferenceName = pollResponse.ReferenceTaskName; - context.WorkflowId = pollResponse.WorkflowInstanceId; - context.CorrelationId = pollResponse.CorrelationId; - context.TaskId = pollResponse.TaskId; - context.WorkerId = workerId; - } + var context = new WorkerExecutionContext( + WorkflowName: pollResponse.WorkflowType, + WorkflowId: pollResponse.WorkflowInstanceId, + TaskName: pollResponse.TaskDefName, + TaskId: pollResponse.TaskId, + TaskReferenceName: pollResponse.ReferenceTaskName, + CorrelationId: pollResponse.CorrelationId, + WorkerId: workerId + ); - var response = await mediator.Send(inputData, tokenHolder.CancellationToken); + _logger.LogInformation( + "Executing worker {Worker} for task {Task}(id={TaskId}) as part of workflow {Workflow}(id={WorkflowId})", + scheduledWorker.TaskType.Name, + pollResponse.TaskDefName, + pollResponse.TaskId, + pollResponse.WorkflowType, + pollResponse.WorkflowInstanceId + ); + var stopwatch = Stopwatch.StartNew(); + var response = await _workerInvokerService.Invoke( + scheduledWorker.TaskType, + pollResponse.InputData, + context, + tokenHolder.CancellationToken + ); + _logger.LogInformation( + "Worker {Worker} executed for task {Task}(id={TaskId}) as part of workflow {Workflow}(id={WorkflowId}), exec time = {WorkerPipelineExecutionTime}ms", + scheduledWorker.TaskType.Name, + pollResponse.TaskDefName, + pollResponse.TaskId, + pollResponse.WorkflowType, + pollResponse.WorkflowInstanceId, + stopwatch.ElapsedMilliseconds + ); await _taskManager.UpdateAsync( new TaskResult { TaskId = pollResponse.TaskId, Status = TaskResultStatus.COMPLETED, - OutputData = SerializationHelper.ObjectToDictionary(response, ConductorConstants.IoJsonSerializerSettings), + OutputData = response, WorkflowInstanceId = pollResponse.WorkflowInstanceId }, tokenHolder.CancellationToken @@ -244,9 +244,10 @@ await _taskManager.UpdateAsync( catch (Exception exception) { _logger.LogError( - "{@Exception} while executing {Task} as part of {Workflow} with id {WorkflowId}", exception, + "Exception while processing polled task {Task}(id={TaskId}) as part of workflow {Workflow}(id={WorkflowId})", pollResponse.TaskDefName, + pollResponse.TaskId, pollResponse.WorkflowType, pollResponse.WorkflowInstanceId ); diff --git a/src/ConductorSharp.Engine/Util/ConductorSharpExecutionContext.cs b/src/ConductorSharp.Engine/Util/ConductorSharpExecutionContext.cs deleted file mode 100644 index 91f30f9b..00000000 --- a/src/ConductorSharp.Engine/Util/ConductorSharpExecutionContext.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace ConductorSharp.Engine.Util -{ - public class ConductorSharpExecutionContext - { - public string WorkflowName { get; set; } - public string WorkflowId { get; set; } - public string TaskName { get; set; } - public string TaskId { get; set; } - public string TaskReferenceName { get; set; } - public string CorrelationId { get; set; } - public string WorkerId { get; set; } - } -} diff --git a/src/ConductorSharp.Engine/Util/EmbeddedFileHelper.cs b/src/ConductorSharp.Engine/Util/EmbeddedFileHelper.cs deleted file mode 100644 index 999573df..00000000 --- a/src/ConductorSharp.Engine/Util/EmbeddedFileHelper.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using System.IO; -using System.Reflection; -using System.Text; -using System.Threading.Tasks; -using Newtonsoft.Json; - -namespace ConductorSharp.Engine.Util -{ - internal static class EmbeddedFileHelper - { - private static string ReadAssemblyFile(Assembly assembly, string name) - { - var stream = assembly.GetManifestResourceStream(name) ?? throw new InvalidOperationException($"Resource {name} does not exist."); - using var reader = new StreamReader(stream, Encoding.UTF8); - - return reader.ReadToEnd(); - } - - public static T GetObjectFromEmbeddedFile(string fileName, params (string Key, object Value)[] templateParams) - { - fileName = fileName.Replace("~/", typeof(EmbeddedFileHelper).Assembly.GetName().Name + ".").Replace("/", "."); - - var contents = ReadAssemblyFile(typeof(EmbeddedFileHelper).Assembly, fileName); - - if (templateParams != null) - foreach (var (Key, Value) in templateParams) - contents = contents.Replace("{{" + Key + "}}", $"{Value}"); - - return JsonConvert.DeserializeObject(contents); - } - - public static string GetLinesFromEmbeddedFile(string fileName) - { - fileName = fileName.Replace("~/", typeof(EmbeddedFileHelper).Assembly.GetName().Name + ".").Replace("/", "."); - - var contents = ReadAssemblyFile(typeof(EmbeddedFileHelper).Assembly, fileName); - - return contents; - } - - public static Task GetObjectFromEmbeddedFileAsync(string fileName, params (string Key, object Value)[] templateParams) => - Task.FromResult(GetObjectFromEmbeddedFile(fileName, templateParams)); - - public static string Reserialize(string fileName, params (string Key, object Value)[] templateParams) - { - var file = GetObjectFromEmbeddedFile(fileName, templateParams); - return JsonConvert.SerializeObject(file); - } - } -} diff --git a/src/ConductorSharp.Engine/Util/ObjectRequest.cs b/src/ConductorSharp.Engine/Util/ObjectRequest.cs new file mode 100644 index 00000000..93ea0f6a --- /dev/null +++ b/src/ConductorSharp.Engine/Util/ObjectRequest.cs @@ -0,0 +1,5 @@ +using ConductorSharp.Engine.Interface; + +namespace ConductorSharp.Engine.Util; + +internal class ObjectRequest : ITaskInput; diff --git a/src/ConductorSharp.Engine/Util/WorkerExecutionContext.cs b/src/ConductorSharp.Engine/Util/WorkerExecutionContext.cs new file mode 100644 index 00000000..7eb65e73 --- /dev/null +++ b/src/ConductorSharp.Engine/Util/WorkerExecutionContext.cs @@ -0,0 +1,11 @@ +namespace ConductorSharp.Engine.Util; + +public record WorkerExecutionContext( + string WorkflowName, + string WorkflowId, + string TaskName, + string TaskId, + string TaskReferenceName, + string CorrelationId, + string WorkerId +); diff --git a/src/ConductorSharp.Engine/Util/WorkerUtil.cs b/src/ConductorSharp.Engine/Util/WorkerUtil.cs new file mode 100644 index 00000000..6e1e3f91 --- /dev/null +++ b/src/ConductorSharp.Engine/Util/WorkerUtil.cs @@ -0,0 +1,14 @@ +using System; +using ConductorSharp.Engine.Interface; + +namespace ConductorSharp.Engine.Util +{ + internal static class WorkerUtil + { + public static (Type RequestType, Type ResponseType) GetRequestResponseTypes(Type workerType) + { + var types = workerType.GetInterface(typeof(IWorker<,>).Name)!.GetGenericArguments(); + return (types[0], types[1]); + } + } +} diff --git a/src/ConductorSharp.Engine/Worker.cs b/src/ConductorSharp.Engine/Worker.cs new file mode 100644 index 00000000..91b7d982 --- /dev/null +++ b/src/ConductorSharp.Engine/Worker.cs @@ -0,0 +1,13 @@ +using System.Threading; +using System.Threading.Tasks; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Model; +using ConductorSharp.Engine.Util; + +namespace ConductorSharp.Engine; + +public abstract class Worker : SimpleTaskModel, IWorker + where TRequest : ITaskInput, new() +{ + public abstract Task Handle(TRequest test, WorkerExecutionContext context, CancellationToken cancellationToken); +} diff --git a/src/ConductorSharp.Patterns/Builders/CSharpLambdaTaskBuilder.cs b/src/ConductorSharp.Patterns/Builders/CSharpLambdaTaskBuilder.cs index bbe60fc8..c1fafdc4 100644 --- a/src/ConductorSharp.Patterns/Builders/CSharpLambdaTaskBuilder.cs +++ b/src/ConductorSharp.Patterns/Builders/CSharpLambdaTaskBuilder.cs @@ -10,7 +10,6 @@ using ConductorSharp.Patterns.Exceptions; using ConductorSharp.Patterns.Model; using ConductorSharp.Patterns.Tasks; -using MediatR; using Newtonsoft.Json.Linq; namespace ConductorSharp.Patterns.Builders @@ -24,7 +23,7 @@ public static ITaskOptionsBuilder AddTask( Func lambda ) where TWorkflow : ITypedWorkflow - where TInput : IRequest + where TInput : ITaskInput { var configurationProp = builder.ConfigurationProperties.FirstOrDefault(prop => prop.Key == CSharpLambdaTask.LambdaTaskNameConfigurationProperty) @@ -50,7 +49,7 @@ Func lambda } internal class CSharpLambdaTaskBuilder : BaseTaskBuilder - where TInput : IRequest + where TInput : ITaskInput { public const string LambdaIdStorageKey = "ConductorSharp.Engine.CSharpLambdaTaskBuilder.LambdaId"; diff --git a/src/ConductorSharp.Patterns/Extensions/ContainerBuilderExtensions.cs b/src/ConductorSharp.Patterns/Extensions/ContainerBuilderExtensions.cs index 8aecc193..8576cba1 100644 --- a/src/ConductorSharp.Patterns/Extensions/ContainerBuilderExtensions.cs +++ b/src/ConductorSharp.Patterns/Extensions/ContainerBuilderExtensions.cs @@ -13,7 +13,6 @@ public static IExecutionManagerBuilder AddConductorSharpPatterns(this IExecution { executionManagerBuilder.Builder.RegisterWorkerTask(); executionManagerBuilder.Builder.RegisterWorkerTask(); - executionManagerBuilder.Builder.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(typeof(WaitSeconds).Assembly)); return executionManagerBuilder; } @@ -27,7 +26,6 @@ public static IExecutionManagerBuilder AddCSharpLambdaTasks( { options.OwnerEmail = "owneremail@gmail.com"; }); - executionManagerBuilder.Builder.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(typeof(CSharpLambdaTask).Assembly)); executionManagerBuilder.Builder.AddSingleton( new ConfigurationProperty(CSharpLambdaTask.LambdaTaskNameConfigurationProperty, csharpLambdaTaskNamePrefix) ); diff --git a/src/ConductorSharp.Patterns/Model/CSharpLambdaTaskModel.cs b/src/ConductorSharp.Patterns/Model/CSharpLambdaTaskModel.cs index 1771348d..63ebde04 100644 --- a/src/ConductorSharp.Patterns/Model/CSharpLambdaTaskModel.cs +++ b/src/ConductorSharp.Patterns/Model/CSharpLambdaTaskModel.cs @@ -1,7 +1,8 @@ -using ConductorSharp.Engine.Model; -using MediatR; +using ConductorSharp.Engine.Interface; +using ConductorSharp.Engine.Model; namespace ConductorSharp.Patterns.Model { - public class CSharpLambdaTaskModel : TaskModel where TInput : IRequest { } + public class CSharpLambdaTaskModel : TaskModel + where TInput : ITaskInput; } diff --git a/src/ConductorSharp.Patterns/Tasks/CSharpLambdaTask.cs b/src/ConductorSharp.Patterns/Tasks/CSharpLambdaTask.cs index 3cbc570c..979a9c36 100644 --- a/src/ConductorSharp.Patterns/Tasks/CSharpLambdaTask.cs +++ b/src/ConductorSharp.Patterns/Tasks/CSharpLambdaTask.cs @@ -10,13 +10,12 @@ using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Util; using ConductorSharp.Patterns.Exceptions; -using MediatR; using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace ConductorSharp.Patterns.Tasks { - internal class CSharpLambdaTaskInput : IRequest + internal class CSharpLambdaTaskInput : ITaskInput { public const string LambdaIdenfitierParamName = "lambda_identifier"; public const string TaskInputParamName = "task_input"; @@ -31,14 +30,14 @@ internal class CSharpLambdaTaskInput : IRequest } [OriginalName(TaskName)] - internal class CSharpLambdaTask(WorkflowBuildItemRegistry itemRegistry) : ITaskRequestHandler + internal class CSharpLambdaTask(WorkflowBuildItemRegistry itemRegistry) : IWorker { public const string TaskName = "CSHRP_inln_lmbd"; public const string LambdaTaskNameConfigurationProperty = nameof(LambdaTaskNameConfigurationProperty); private readonly WorkflowBuildItemRegistry _itemRegistry = itemRegistry; - public Task Handle(CSharpLambdaTaskInput request, CancellationToken cancellationToken) + public Task Handle(CSharpLambdaTaskInput request, WorkerExecutionContext context, CancellationToken cancellationToken) { if (string.IsNullOrEmpty(request.LambdaIdentifier)) { diff --git a/src/ConductorSharp.Patterns/Tasks/ReadWorkflowTasks.cs b/src/ConductorSharp.Patterns/Tasks/ReadWorkflowTasks.cs index 7377b2e2..bd7b8543 100644 --- a/src/ConductorSharp.Patterns/Tasks/ReadWorkflowTasks.cs +++ b/src/ConductorSharp.Patterns/Tasks/ReadWorkflowTasks.cs @@ -6,14 +6,14 @@ using ConductorSharp.Client.Service; using ConductorSharp.Engine; using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Util; -using MediatR; using Newtonsoft.Json.Linq; namespace ConductorSharp.Patterns.Tasks { #region models - public class ReadWorkflowTasksRequest : IRequest + public class ReadWorkflowTasksRequest : ITaskInput { /// /// Comma separated list of task reference names to be read from specified workflow @@ -35,11 +35,15 @@ public record WorkflowDetails(JObject InputData); /// Uses the Conductor API to read the input/output and status of the specified tasks for the specified workflow. /// [OriginalName(Constants.TaskNamePrefix + "_read_tasks")] - public class ReadWorkflowTasks(IWorkflowService workflowService) : TaskRequestHandler + public class ReadWorkflowTasks(IWorkflowService workflowService) : Worker { private readonly IWorkflowService _workflowService = workflowService; - public override async Task Handle(ReadWorkflowTasksRequest input, CancellationToken cancellationToken) + public override async Task Handle( + ReadWorkflowTasksRequest input, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) { if (string.IsNullOrEmpty(input.TaskNames)) { diff --git a/src/ConductorSharp.Patterns/Tasks/WaitSeconds.cs b/src/ConductorSharp.Patterns/Tasks/WaitSeconds.cs index 9b683033..c93770f0 100644 --- a/src/ConductorSharp.Patterns/Tasks/WaitSeconds.cs +++ b/src/ConductorSharp.Patterns/Tasks/WaitSeconds.cs @@ -1,16 +1,15 @@ -using System; -using System.ComponentModel.DataAnnotations; +using System.ComponentModel.DataAnnotations; using System.Threading; using System.Threading.Tasks; using ConductorSharp.Engine; using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; using ConductorSharp.Engine.Model; using ConductorSharp.Engine.Util; -using MediatR; namespace ConductorSharp.Patterns.Tasks { - public class WaitSecondsRequest : IRequest + public class WaitSecondsRequest : ITaskInput { /// /// Time to wait in seconds @@ -24,9 +23,9 @@ public class WaitSecondsRequest : IRequest /// Executes `await Task.Delay(input.Seconds * 1000)` to wait for a given amount of seconds /// [OriginalName(Constants.TaskNamePrefix + "_wait_seconds")] - public class WaitSeconds : TaskRequestHandler + public class WaitSeconds : Worker { - public override async Task Handle(WaitSecondsRequest input, CancellationToken cancellationToken) + public override async Task Handle(WaitSecondsRequest input, WorkerExecutionContext context, CancellationToken cancellationToken) { await Task.Delay(input.Seconds * 1000, cancellationToken); return new NoOutput(); diff --git a/src/ConductorSharp.Toolkit/ConductorSharp.Toolkit.csproj b/src/ConductorSharp.Toolkit/ConductorSharp.Toolkit.csproj index b103e438..b9a49088 100644 --- a/src/ConductorSharp.Toolkit/ConductorSharp.Toolkit.csproj +++ b/src/ConductorSharp.Toolkit/ConductorSharp.Toolkit.csproj @@ -10,20 +10,6 @@ 3.0.1-beta3 - - - - - - - - - - - - - - diff --git a/src/ConductorSharp.Toolkit/Templates/TaskCollectionTemplate.default b/src/ConductorSharp.Toolkit/Templates/TaskCollectionTemplate.default deleted file mode 100644 index 38ae9b09..00000000 --- a/src/ConductorSharp.Toolkit/Templates/TaskCollectionTemplate.default +++ /dev/null @@ -1,20 +0,0 @@ -using ConductorSharp.Engine.Model; -using System.ComponentModel; -using System.ComponentModel.DataAnnotations; -using Newtonsoft.Json; -using MediatR; -using ConductorSharp.Engine.Util; - -//------------------------------------------------------------------------------ -// -// This code was generated by a tool. -// -// Changes to this file may cause incorrect behavior and will be lost if -// the code is regenerated. -// -//------------------------------------------------------------------------------ - -namespace {{namespace}} -{ - {{taskCollection}} -} diff --git a/src/ConductorSharp.Toolkit/Templates/WorkerTemplate.default b/src/ConductorSharp.Toolkit/Templates/WorkerTemplate.default deleted file mode 100644 index bd834386..00000000 --- a/src/ConductorSharp.Toolkit/Templates/WorkerTemplate.default +++ /dev/null @@ -1,31 +0,0 @@ - public partial class {{workerName}}Input : IRequest<{{workerName}}Output> - { -{{inputProperties}} - } - - public partial class {{workerName}}Output - { -{{outputProperties}} - } - - /// - /// {{commentDescription}} - /// - /// - /// {{originalName}} - /// - /// - /// {{ownerEmail}} - /// - /// - /// {{ownerApp}} - /// - /// - /// {{note}} - /// - [OriginalName("{{originalName}}")] - public partial class {{workerName}} : SimpleTaskModel<{{workerName}}Input, {{workerName}}Output> - { - } - - diff --git a/src/ConductorSharp.Toolkit/Templates/WorkflowCollectionTemplate.default b/src/ConductorSharp.Toolkit/Templates/WorkflowCollectionTemplate.default deleted file mode 100644 index 98daf1a7..00000000 --- a/src/ConductorSharp.Toolkit/Templates/WorkflowCollectionTemplate.default +++ /dev/null @@ -1,21 +0,0 @@ -using ConductorSharp.Engine.Model; -using System.ComponentModel; -using System.ComponentModel.DataAnnotations; -using Newtonsoft.Json; -using ConductorSharp.Engine.Builders; -using MediatR; -using ConductorSharp.Engine.Util; - -//------------------------------------------------------------------------------ -// -// This code was generated by a tool. -// -// Changes to this file may cause incorrect behavior and will be lost if -// the code is regenerated. -// -//------------------------------------------------------------------------------ - -namespace {{namespace}} -{ - {{workflowCollection}} -} diff --git a/src/ConductorSharp.Toolkit/Templates/WorkflowTemplate.default b/src/ConductorSharp.Toolkit/Templates/WorkflowTemplate.default deleted file mode 100644 index 684cccd7..00000000 --- a/src/ConductorSharp.Toolkit/Templates/WorkflowTemplate.default +++ /dev/null @@ -1,30 +0,0 @@ - public partial class {{workflowName}}Input : IRequest<{{workflowName}}Output> - { -{{inputProperties}} - } - - public partial class {{workflowName}}Output - { - } - - /// - /// {{commentDescription}} - /// - /// - /// {{originalName}} - /// - /// - /// {{ownerEmail}} - /// - /// - /// {{ownerApp}} - /// - /// - /// {{note}} - /// - [OriginalName("{{originalName}}")] - public partial class {{workflowName}} : SubWorkflowTaskModel<{{workflowName}}Input, {{workflowName}}Output> - { - } - - diff --git a/src/ConductorSharp.Toolkit/Util/TaskModelGenerator.cs b/src/ConductorSharp.Toolkit/Util/TaskModelGenerator.cs index 352cbae1..b623b05f 100644 --- a/src/ConductorSharp.Toolkit/Util/TaskModelGenerator.cs +++ b/src/ConductorSharp.Toolkit/Util/TaskModelGenerator.cs @@ -51,7 +51,7 @@ public string Build() CreateUsings( "ConductorSharp.Engine.Model", "ConductorSharp.Engine.Builders.Metadata", - "MediatR", + "ConductorSharp.Engine.Interface", "Newtonsoft.Json", "ConductorSharp.Engine.Builders" ) @@ -146,7 +146,7 @@ private ClassDeclarationSyntax CreateInputClass() var typeArgumentList = SyntaxFactory.TypeArgumentList( SyntaxFactory.SingletonSeparatedList(SyntaxFactory.ParseTypeName($"{ClassName}Output")) ); - var baseType = SyntaxFactory.SimpleBaseType(SyntaxFactory.GenericName("IRequest").WithTypeArgumentList(typeArgumentList)); + var baseType = SyntaxFactory.SimpleBaseType(SyntaxFactory.GenericName("ITaskInput").WithTypeArgumentList(typeArgumentList)); var baseList = SyntaxFactory.BaseList(SyntaxFactory.SingletonSeparatedList(baseType)); var classDeclaration = SyntaxFactory .ClassDeclaration($"{ClassName}Input") diff --git a/src/ConductorSharp.Toolkit/conductorsharp.yaml b/src/ConductorSharp.Toolkit/conductorsharp.yaml index 5f7580eb..b1c7005c 100644 --- a/src/ConductorSharp.Toolkit/conductorsharp.yaml +++ b/src/ConductorSharp.Toolkit/conductorsharp.yaml @@ -1,4 +1,4 @@ -baseUrl: http://localhost:8080 +baseUrl: http://localhost:8127 apiPath: api namespace: ConductorSharp.Toolkit destination: ./ConductorSharp.Toolkit/Generated \ No newline at end of file diff --git a/test/ConductorSharp.Engine.IntegrationTests/ConductorSharp.Engine.IntegrationTests.csproj b/test/ConductorSharp.Engine.IntegrationTests/ConductorSharp.Engine.IntegrationTests.csproj new file mode 100644 index 00000000..e65ae192 --- /dev/null +++ b/test/ConductorSharp.Engine.IntegrationTests/ConductorSharp.Engine.IntegrationTests.csproj @@ -0,0 +1,35 @@ + + + + net8.0 + enable + enable + + false + true + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + diff --git a/test/ConductorSharp.Engine.IntegrationTests/CustomWebApplicationFactory.cs b/test/ConductorSharp.Engine.IntegrationTests/CustomWebApplicationFactory.cs new file mode 100644 index 00000000..0943e58d --- /dev/null +++ b/test/ConductorSharp.Engine.IntegrationTests/CustomWebApplicationFactory.cs @@ -0,0 +1,93 @@ +using System.Diagnostics; +using System.Net; +using ConductorSharp.ApiEnabled.Workflows; +using ConductorSharp.Client.Service; +using ConductorSharp.Engine.Util; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Mvc.Testing; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Testcontainers.PostgreSql; + +namespace ConductorSharp.Engine.IntegrationTests; + +public class CustomWebApplicationFactory : WebApplicationFactory, IAsyncLifetime +{ + private PostgreSqlContainer _postgresContainer = null!; + private IContainer _conductorContainer = null!; + + public async Task InitializeAsync() + { + var network = new NetworkBuilder().Build(); + + _postgresContainer = new PostgreSqlBuilder() + .WithImage("postgres") + .WithUsername("conductor") + .WithPassword("conductor") + .WithNetwork(network) + .WithNetworkAliases("postgresdb") + .WithWaitStrategy( + Wait.ForUnixContainer() + .UntilMessageIsLogged("database system is ready to accept connections", w => w.WithInterval(TimeSpan.FromSeconds(5))) + ) + .Build(); + + _conductorContainer = new ContainerBuilder() + .WithImage("conductor:server") + .WithEnvironment("CONFIG_PROP", "config-postgres.properties") + .WithPortBinding(8080, true) + .WithWaitStrategy( + Wait.ForUnixContainer() + .UntilHttpRequestIsSucceeded( + w => w.ForPath("/health").ForPort(8080).ForStatusCode(HttpStatusCode.OK), + w => w.WithInterval(TimeSpan.FromSeconds(5)) + ) + ) + .WithNetwork(network) + .Build(); + + await _postgresContainer.StartAsync(); + await _conductorContainer.StartAsync(); + + var metadataService = Services.GetRequiredService(); + var timeout = TimeSpan.FromSeconds(10); + var deploymentStopwatch = Stopwatch.StartNew(); + var wfDeployed = false; + + while (deploymentStopwatch.Elapsed < timeout) + { + var wfs = await metadataService.ListWorkflowsAsync(); + wfDeployed = wfs.Any(wf => wf.Name == NamingUtil.NameOf()); + + if (wfDeployed) + break; + + await Task.Delay(1000); + } + + if (!wfDeployed) + throw new TimeoutException("Timeout during workflow deployment"); + } + + protected override void ConfigureWebHost(IWebHostBuilder builder) + { + var config = new ConfigurationBuilder() + .AddInMemoryCollection( + new Dictionary() + { + { "Conductor:BaseUrl", $"http://{_conductorContainer.Hostname}:{_conductorContainer.GetMappedPublicPort(8080)}" } + }! + ) + .Build(); + + builder.UseConfiguration(config); + } + + async Task IAsyncLifetime.DisposeAsync() + { + await _conductorContainer.DisposeAsync(); + await _postgresContainer.DisposeAsync(); + } +} diff --git a/test/ConductorSharp.Engine.IntegrationTests/WorfklowExecutionTests.cs b/test/ConductorSharp.Engine.IntegrationTests/WorfklowExecutionTests.cs new file mode 100644 index 00000000..373cce78 --- /dev/null +++ b/test/ConductorSharp.Engine.IntegrationTests/WorfklowExecutionTests.cs @@ -0,0 +1,50 @@ +using System.Diagnostics; +using ConductorSharp.ApiEnabled.Workflows; +using ConductorSharp.Client.Generated; +using ConductorSharp.Client.Service; +using ConductorSharp.Engine.Util; +using Microsoft.Extensions.DependencyInjection; +using Task = System.Threading.Tasks.Task; + +namespace ConductorSharp.Engine.IntegrationTests +{ + public class WorfklowExecutionTests : IClassFixture + { + private readonly CustomWebApplicationFactory _factory; + + public WorfklowExecutionTests(CustomWebApplicationFactory factory) + { + _factory = factory; + } + + [Fact] + public async Task HostShouldExecuteWorkflowTasksSuccesfully() + { + var workflowService = _factory.Services.GetRequiredService(); + var workflowId = await workflowService.StartAsync(new() { Name = NamingUtil.NameOf(), Version = 1 }); + + var wf = await WaitForWorkflowTermination(workflowId); + Assert.Equal(WorkflowStatus.COMPLETED, wf.Status); + } + + private async Task WaitForWorkflowTermination(string workflowId) + { + var workflowService = _factory.Services.GetRequiredService(); + var timeout = TimeSpan.FromMinutes(5); + var sw = Stopwatch.StartNew(); + Workflow wf = null!; + + while (sw.Elapsed < timeout) + { + wf = await workflowService.GetExecutionStatusAsync(workflowId); + + if (wf.Status != WorkflowStatus.RUNNING) + break; + + await Task.Delay(1000); + } + + return wf; + } + } +} diff --git a/test/ConductorSharp.Engine.Tests/Integration/UtilityTests.cs b/test/ConductorSharp.Engine.Tests/Integration/UtilityTests.cs index 54e503ce..d9086d2a 100644 --- a/test/ConductorSharp.Engine.Tests/Integration/UtilityTests.cs +++ b/test/ConductorSharp.Engine.Tests/Integration/UtilityTests.cs @@ -1,9 +1,4 @@ using ConductorSharp.Engine.Tests.Samples.Workflows; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace ConductorSharp.Engine.Tests.Integration { diff --git a/test/ConductorSharp.Engine.Tests/Integration/WorkflowBuilderTests.cs b/test/ConductorSharp.Engine.Tests/Integration/WorkflowBuilderTests.cs index 7c1dcc9d..c2dbecb6 100644 --- a/test/ConductorSharp.Engine.Tests/Integration/WorkflowBuilderTests.cs +++ b/test/ConductorSharp.Engine.Tests/Integration/WorkflowBuilderTests.cs @@ -313,11 +313,9 @@ private static IServiceProvider RegisterWorkflow() containerBuilder .AddConductorSharp("example.com/api") .SetBuildConfiguration(new() { DefaultOwnerEmail = null }) - .AddExecutionManager(10, 100, 100, null, typeof(WorkflowBuilderTests).Assembly) + .AddExecutionManager(10, 100, 100, null) .AddPipelines(pipelines => { - pipelines.AddContextLogging(); - pipelines.AddRequestResponseLogging(); pipelines.AddValidation(); }) .AddCSharpLambdaTasks("TEST"); diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/ArrayTask.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/ArrayTask.cs index faecc69a..139e2435 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/ArrayTask.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/ArrayTask.cs @@ -1,12 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace ConductorSharp.Engine.Tests.Samples.Tasks +namespace ConductorSharp.Engine.Tests.Samples.Tasks { - public class ArrayTaskInput : IRequest + public class ArrayTaskInput : ITaskInput { public class TestModel { @@ -19,7 +13,7 @@ public class TestModel public object Objects { get; set; } } - public class ArrayTaskOutput { } + public class ArrayTaskOutput; - public class ArrayTask : SimpleTaskModel { } + public class ArrayTask : SimpleTaskModel; } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/CustomerGetV1.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/CustomerGetV1.cs index e1290d9f..5b973682 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/CustomerGetV1.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/CustomerGetV1.cs @@ -2,7 +2,7 @@ namespace ConductorSharp.Engine.Tests.Samples.Tasks; -public partial class CustomerGetV1Input : IRequest +public partial class CustomerGetV1Input : ITaskInput { /// /// customer_id diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/DictionaryInputTask.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/DictionaryInputTask.cs index 3bdec654..31abb28c 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/DictionaryInputTask.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/DictionaryInputTask.cs @@ -1,6 +1,6 @@ namespace ConductorSharp.Engine.Tests.Samples.Tasks; -public class DictionaryInputTaskInput : IRequest +public class DictionaryInputTaskInput : ITaskInput { public IDictionary Object { get; set; } public IDictionary StringObject { get; set; } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/EmailPrepareV1.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/EmailPrepareV1.cs index 387ed5eb..b3db6e72 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/EmailPrepareV1.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/EmailPrepareV1.cs @@ -2,7 +2,7 @@ namespace ConductorSharp.Engine.Tests.Samples.Tasks; -public partial class EmailPrepareV1Input : IRequest +public partial class EmailPrepareV1Input : ITaskInput { /// /// address diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/ListTask.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/ListTask.cs index 123042fe..93c232ae 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/ListTask.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/ListTask.cs @@ -6,7 +6,7 @@ namespace ConductorSharp.Engine.Tests.Samples.Tasks { - public class ListTaskInput : IRequest + public class ListTaskInput : ITaskInput { public List List { get; set; } } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/NestedObjects.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/NestedObjects.cs index f6c7700c..1dafa031 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/NestedObjects.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/NestedObjects.cs @@ -1,13 +1,8 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Builders.Metadata; namespace ConductorSharp.Engine.Tests.Samples.Tasks { - public class TaskNestedObjectsInput : IRequest + public class TaskNestedObjectsInput : ITaskInput { public object NestedObjects { get; set; } } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/TaskPropertiesTask.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/TaskPropertiesTask.cs index b009bcdd..56443373 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/TaskPropertiesTask.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/TaskPropertiesTask.cs @@ -1,6 +1,6 @@ namespace ConductorSharp.Engine.Tests.Samples.Tasks { - public class TaskPropertiesTaskInput : IRequest + public class TaskPropertiesTaskInput : ITaskInput { public string Status { get; set; } public string TaskType { get; set; } @@ -13,7 +13,7 @@ public class TaskPropertiesTaskInput : IRequest public string CorrelationId { get; set; } } - public class TaskPropertiesTaskOutput { } + public class TaskPropertiesTaskOutput; - public class TaskPropertiesTask : SimpleTaskModel { } + public class TaskPropertiesTask : SimpleTaskModel; } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Tasks/VersionSubworkflow.cs b/test/ConductorSharp.Engine.Tests/Samples/Tasks/VersionSubworkflow.cs index 5fcd557c..a8ba60b0 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Tasks/VersionSubworkflow.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Tasks/VersionSubworkflow.cs @@ -1,17 +1,12 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Builders.Metadata; namespace ConductorSharp.Engine.Tests.Samples.Tasks { - public class VersionSubworkflowInput : IRequest { } + public class VersionSubworkflowInput : ITaskInput { } - public class VersionSubworkflowOutput { } + public class VersionSubworkflowOutput; [OriginalName("TEST_subworkflow")] [Version(3)] - public class VersionSubworkflow : SubWorkflowTaskModel { } + public class VersionSubworkflow : SubWorkflowTaskModel; } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Workers/GetCustomerHandler.cs b/test/ConductorSharp.Engine.Tests/Samples/Workers/GetCustomerHandler.cs index 4eb452c5..f8831274 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Workers/GetCustomerHandler.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Workers/GetCustomerHandler.cs @@ -3,7 +3,7 @@ namespace ConductorSharp.Engine.Tests.Samples.Workers; -public class GetCustomerRequest : IRequest +public class GetCustomerRequest : ITaskInput { [Required] [JsonProperty("id")] @@ -24,8 +24,11 @@ public class Customer } [OriginalName("CUSTOMER_get")] -public class GetCustomerHandler : TaskRequestHandler +public class GetCustomerHandler : Worker { - public override Task Handle(GetCustomerRequest request, CancellationToken cancellationToken) => - throw new NotImplementedException(); + public override Task Handle( + GetCustomerRequest request, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) => throw new NotImplementedException(); } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Workers/PrepareEmailHandler.cs b/test/ConductorSharp.Engine.Tests/Samples/Workers/PrepareEmailHandler.cs index 7039b55a..a62b1bd2 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Workers/PrepareEmailHandler.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Workers/PrepareEmailHandler.cs @@ -2,7 +2,7 @@ namespace ConductorSharp.Engine.Tests.Samples.Workers; -public class PrepareEmailRequest : IRequest +public class PrepareEmailRequest : ITaskInput { public string CustomerName { get; set; } public string Address { get; set; } @@ -14,8 +14,11 @@ public class PrepareEmailResponse } [OriginalName("EMAIL_prepare")] -public class PrepareEmailHandler : TaskRequestHandler +public class PrepareEmailHandler : Worker { - public override Task Handle(PrepareEmailRequest request, CancellationToken cancellationToken) => - throw new NotImplementedException(); + public override Task Handle( + PrepareEmailRequest request, + WorkerExecutionContext context, + CancellationToken cancellationToken + ) => throw new NotImplementedException(); } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Workflows/CSharpLambdaWorkflow.cs b/test/ConductorSharp.Engine.Tests/Samples/Workflows/CSharpLambdaWorkflow.cs index c43ffe36..42f00558 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Workflows/CSharpLambdaWorkflow.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Workflows/CSharpLambdaWorkflow.cs @@ -15,7 +15,7 @@ public class CSharpLambdaWorkflowOutput : WorkflowOutput { } [WorkflowMetadata(OwnerEmail = "test@test.com")] public class CSharpLambdaWorkflow : Workflow { - public class LambdaTaskInput : IRequest + public class LambdaTaskInput : ITaskInput { public string LambdaInput { get; set; } } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Workflows/DynamicTask.cs b/test/ConductorSharp.Engine.Tests/Samples/Workflows/DynamicTask.cs index 14a745fa..cbb4d646 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Workflows/DynamicTask.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Workflows/DynamicTask.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Builders.Metadata; namespace ConductorSharp.Engine.Tests.Samples.Workflows { @@ -22,7 +17,7 @@ public class ExpectedDynamicOutput public string Name { get; set; } } - public class MandatoryDynamicInput : IRequest + public class MandatoryDynamicInput : ITaskInput { public int Count { get; set; } public bool ShouldUseNext { get; set; } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Workflows/EventTaskWorkflow.cs b/test/ConductorSharp.Engine.Tests/Samples/Workflows/EventTaskWorkflow.cs index 1fcc00fa..d20a1657 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Workflows/EventTaskWorkflow.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Workflows/EventTaskWorkflow.cs @@ -9,7 +9,7 @@ public class EventTaskWorkflowOutput : WorkflowOutput { } [WorkflowMetadata(OwnerEmail = "test@test.com")] public class EventTaskWorkflow : Workflow { - public class EventTaskPayload : IRequest + public class EventTaskPayload : ITaskInput { public string PayloadParam { get; set; } } diff --git a/test/ConductorSharp.Engine.Tests/Samples/Workflows/StringAddition.cs b/test/ConductorSharp.Engine.Tests/Samples/Workflows/StringAddition.cs index c59243b7..e06f3a84 100644 --- a/test/ConductorSharp.Engine.Tests/Samples/Workflows/StringAddition.cs +++ b/test/ConductorSharp.Engine.Tests/Samples/Workflows/StringAddition.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Builders.Metadata; namespace ConductorSharp.Engine.Tests.Samples.Workflows { @@ -22,7 +17,7 @@ public StringAddition(WorkflowDefinitionBuilder + public class StringTaskInput : ITaskInput { public string Input { get; set; } } diff --git a/test/ConductorSharp.Engine.Tests/Unit/ContainerBuilderTests.cs b/test/ConductorSharp.Engine.Tests/Unit/ContainerBuilderTests.cs index 75669f41..9145a270 100644 --- a/test/ConductorSharp.Engine.Tests/Unit/ContainerBuilderTests.cs +++ b/test/ConductorSharp.Engine.Tests/Unit/ContainerBuilderTests.cs @@ -18,13 +18,7 @@ public void AppliesBuildConfigurationDefaults() builder .AddConductorSharp(baseUrl: "http://empty/empty") .SetBuildConfiguration(new BuildConfiguration { DefaultOwnerApp = "testApp", DefaultOwnerEmail = "owner@test.app" }) - .AddExecutionManager( - maxConcurrentWorkers: 1, - sleepInterval: 1, - longPollInterval: 1, - domain: null, - handlerAssemblies: typeof(ContainerBuilderTests).Assembly - ); + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, domain: null); builder.RegisterWorkflow(); var container = builder.BuildServiceProvider(); @@ -42,13 +36,7 @@ public void OverridesBuildConfigurationDefaults() builder .AddConductorSharp(baseUrl: "http://empty/empty") .SetBuildConfiguration(new BuildConfiguration { DefaultOwnerApp = "testApp", DefaultOwnerEmail = "owner@test.app", }) - .AddExecutionManager( - maxConcurrentWorkers: 1, - sleepInterval: 1, - longPollInterval: 1, - null, - handlerAssemblies: typeof(ContainerBuilderTests).Assembly - ); + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, null); builder.RegisterWorkflow(new BuildConfiguration { DefaultOwnerApp = overrideValue }); var container = builder.BuildServiceProvider(); @@ -64,13 +52,7 @@ public void ResolveWorkflowDependencies() var builder = new ServiceCollection(); builder .AddConductorSharp(baseUrl: "http://empty/empty") - .AddExecutionManager( - maxConcurrentWorkers: 1, - sleepInterval: 1, - longPollInterval: 1, - null, - handlerAssemblies: typeof(ContainerBuilderTests).Assembly - ); + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, null); builder.AddTransient(); builder.RegisterWorkflow(); @@ -85,13 +67,7 @@ public void FailsToResolveWorkflowDependencies() var builder = new ServiceCollection(); builder .AddConductorSharp(baseUrl: "http://empty/empty") - .AddExecutionManager( - maxConcurrentWorkers: 1, - sleepInterval: 1, - longPollInterval: 1, - null, - handlerAssemblies: typeof(ContainerBuilderTests).Assembly - ); + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, null); builder.RegisterWorkflow(); var container = builder.BuildServiceProvider(); diff --git a/test/ConductorSharp.Engine.Tests/Unit/TaskDefinitionBuilderTests.cs b/test/ConductorSharp.Engine.Tests/Unit/TaskDefinitionBuilderTests.cs index c0d5730a..88411a3a 100644 --- a/test/ConductorSharp.Engine.Tests/Unit/TaskDefinitionBuilderTests.cs +++ b/test/ConductorSharp.Engine.Tests/Unit/TaskDefinitionBuilderTests.cs @@ -17,11 +17,9 @@ public TaskDefinitionBuilderTests() _containerBuilder .AddConductorSharp("http://example.com/api") - .AddExecutionManager(10, 100, 100, null, typeof(TaskDefinitionBuilderTests).Assembly) + .AddExecutionManager(10, 100, 100, null) .AddPipelines(pipelines => { - pipelines.AddContextLogging(); - pipelines.AddRequestResponseLogging(); pipelines.AddValidation(); }); diff --git a/test/ConductorSharp.Engine.Tests/Unit/WorkerInvokerServiceTests.cs b/test/ConductorSharp.Engine.Tests/Unit/WorkerInvokerServiceTests.cs new file mode 100644 index 00000000..f3645b73 --- /dev/null +++ b/test/ConductorSharp.Engine.Tests/Unit/WorkerInvokerServiceTests.cs @@ -0,0 +1,109 @@ +using ConductorSharp.Client; +using ConductorSharp.Engine.Extensions; +using ConductorSharp.Engine.Service; +using Microsoft.Extensions.DependencyInjection; +using Newtonsoft.Json.Linq; + +namespace ConductorSharp.Engine.Tests.Unit; + +public class WorkerInvokerServiceTests +{ + public class Request : ITaskInput + { + public string Input { get; set; } + + public class Response + { + public string Output { get; set; } + public WorkerExecutionContext HandlerContext { get; set; } + public WorkerExecutionContext MiddlewareContext { get; set; } + public WorkerExecutionContext GenericMiddlewareContext { get; set; } + } + + public class Handler : IWorker + { + public Task Handle(Request request, WorkerExecutionContext context, CancellationToken cancellationToken) => + Task.FromResult(new Response { Output = request.Input + "Worker", HandlerContext = context }); + } + } + + public class Middleware : IWorkerMiddleware + { + public async Task Handle( + Request request, + WorkerExecutionContext context, + Func> next, + CancellationToken cancellationToken + ) + { + var response = await next(); + response.MiddlewareContext = context; + response.Output += "Middleware"; + return response; + } + } + + public class GenericMiddleware : IWorkerMiddleware + where TRequest : ITaskInput, new() + { + public async Task Handle( + TRequest request, + WorkerExecutionContext context, + Func> next, + CancellationToken cancellationToken + ) + { + var response = await next(); + var resp = (Request.Response)(object)response; + resp.GenericMiddlewareContext = context; + resp.Output += "GenericMiddleware"; + + return response; + } + } + + [Fact] + public async Task InvokerShouldInvokeWorkerAndMiddlewaresInCorrectOrder() + { + var collection = new ServiceCollection(); + collection + .AddConductorSharp(baseUrl: "http://empty/empty") + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, domain: null) + .AddPipelines(pipelineBuilder => + { + pipelineBuilder.AddCustomMiddleware(); + pipelineBuilder.AddCustomMiddleware(typeof(GenericMiddleware<,>)); + }); + + collection.RegisterWorkerTask(); + + var provider = collection.BuildServiceProvider(); + + var invoker = new WorkerInvokerService(provider); + var context = new WorkerExecutionContext( + WorkflowName: "WorkflowName", + WorkflowId: "WorkflowId", + TaskName: "TaskName", + TaskId: "TaskId", + TaskReferenceName: "TaskReferenceName", + CorrelationId: "CorrelationId", + WorkerId: "WorkerId" + ); + + var expectedContext = context with { }; + var result = await invoker.Invoke( + typeof(Request.Handler), + new Dictionary() { { "input", "Input" } }, + context, + new CancellationToken(true) + ); + + Assert.Equal("InputWorkerGenericMiddlewareMiddleware", result["output"]); + Assert.Equal(expectedContext, ((JObject)result["handler_context"]).ToObject(ConductorConstants.IoJsonSerializer)); + Assert.Equal(expectedContext, ((JObject)result["middleware_context"]).ToObject(ConductorConstants.IoJsonSerializer)); + Assert.Equal( + expectedContext, + ((JObject)result["generic_middleware_context"]).ToObject(ConductorConstants.IoJsonSerializer) + ); + } +} diff --git a/test/ConductorSharp.Engine.Tests/Unit/WorkflowItemRegistryTests.cs b/test/ConductorSharp.Engine.Tests/Unit/WorkflowItemRegistryTests.cs index 26dfaf2d..0fa39be1 100644 --- a/test/ConductorSharp.Engine.Tests/Unit/WorkflowItemRegistryTests.cs +++ b/test/ConductorSharp.Engine.Tests/Unit/WorkflowItemRegistryTests.cs @@ -14,13 +14,7 @@ public void RegisteredItemsResolve() builder .AddConductorSharp(baseUrl: "http://empty/empty") - .AddExecutionManager( - maxConcurrentWorkers: 1, - sleepInterval: 1, - longPollInterval: 1, - null, - handlerAssemblies: typeof(WorkflowItemRegistryTests).Assembly - ); + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, null); builder.RegisterWorkflow(); var container = builder.BuildServiceProvider(); @@ -42,13 +36,7 @@ public void GetAllCorrectlyResolves() builder .AddConductorSharp(baseUrl: "http://empty/empty") - .AddExecutionManager( - maxConcurrentWorkers: 1, - sleepInterval: 1, - longPollInterval: 1, - null, - handlerAssemblies: typeof(WorkflowItemRegistryTests).Assembly - ); + .AddExecutionManager(maxConcurrentWorkers: 1, sleepInterval: 1, longPollInterval: 1, null); builder.RegisterWorkflow(); var container = builder.BuildServiceProvider(); diff --git a/test/ConductorSharp.Engine.Tests/Usings.cs b/test/ConductorSharp.Engine.Tests/Usings.cs index eb6fe6a9..f02300d2 100644 --- a/test/ConductorSharp.Engine.Tests/Usings.cs +++ b/test/ConductorSharp.Engine.Tests/Usings.cs @@ -3,6 +3,5 @@ global using ConductorSharp.Engine.Model; global using ConductorSharp.Engine.Tests.Samples.Tasks; global using ConductorSharp.Engine.Util; -global using MediatR; global using Newtonsoft.Json; global using Xunit;