Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ConductorSharp.Client/ConductorSharp.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<PackageId>ConductorSharp.Client</PackageId>
<Version>3.7.0</Version>
<Version>3.7.1</Version>
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
<PackageTags>netflix;conductor</PackageTags>
Expand Down
2 changes: 1 addition & 1 deletion src/ConductorSharp.Engine/ConductorSharp.Engine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<PackageId>ConductorSharp.Engine</PackageId>
<Version>3.7.0</Version>
<Version>3.7.1</Version>
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
<PackageTags>netflix;conductor</PackageTags>
Expand Down
4 changes: 2 additions & 2 deletions src/ConductorSharp.Engine/ExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ CancellationToken cancellationToken
// TODO: Check what the operation and payload type are
var externalStorageLocation = await _taskManager.GetExternalStorageLocationAsync(
pollResponse.ExternalInputPayloadStoragePath,
"",
"",
"READ",
"TASK_INPUT",
cancellationToken
);

Expand Down
29 changes: 10 additions & 19 deletions src/ConductorSharp.Engine/TypePollSpreadingExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
.Where(a => a.Value > 0)
.ToDictionary(a => a.Key, a => a.Value);

var scheduledWorkers = _registeredWorkers.Where(a => queuedTasks.ContainsKey(GetQueueTaskName(a)))
.ToList();
var scheduledWorkers = _registeredWorkers.Where(a => queuedTasks.ContainsKey(GetQueueTaskName(a))).ToList();

currentSleepInterval = _pollTimingStrategy.CalculateDelay(
queuedTasks,
Expand All @@ -77,8 +76,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
currentSleepInterval
);

scheduledWorkers =
_pollOrderStrategy.CalculateOrder(queuedTasks, scheduledWorkers, _semaphore.CurrentCount);
scheduledWorkers = _pollOrderStrategy.CalculateOrder(queuedTasks, scheduledWorkers, _semaphore.CurrentCount);

foreach (var scheduledWorker in scheduledWorkers)
{
Expand Down Expand Up @@ -171,14 +169,13 @@ CancellationToken cancellationToken
// TODO: Check what the operation and payload type are
var externalStorageLocation = await _taskManager.GetExternalStorageLocationAsync(
pollResponse.ExternalInputPayloadStoragePath,
"",
"",
"READ",
"TASK_INPUT",
cancellationToken
);

// TODO: iffy
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path,
tokenHolder.CancellationToken);
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, tokenHolder.CancellationToken);

using TextReader textReader = new StreamReader(file.Stream);
var json = await textReader.ReadToEndAsync();
Expand All @@ -190,8 +187,7 @@ CancellationToken cancellationToken
}

var inputType = GetInputType(scheduledWorker.TaskType);
var inputData = SerializationHelper.DictonaryToObject(inputType, pollResponse.InputData,
ConductorConstants.IoJsonSerializerSettings);
var inputData = SerializationHelper.DictonaryToObject(inputType, pollResponse.InputData, ConductorConstants.IoJsonSerializerSettings);
// Poll response data can be huge (if read from external storage)
// We can save memory by not holding reference to pollResponse.InputData after it is parsed
pollResponse.InputData = null;
Expand Down Expand Up @@ -219,9 +215,7 @@ await _taskManager.UpdateAsync(
{
TaskId = pollResponse.TaskId,
Status = TaskResultStatus.COMPLETED,
OutputData =
SerializationHelper.ObjectToDictionary(response,
ConductorConstants.IoJsonSerializerSettings),
OutputData = SerializationHelper.ObjectToDictionary(response, ConductorConstants.IoJsonSerializerSettings),
WorkflowInstanceId = pollResponse.WorkflowInstanceId
},
tokenHolder.CancellationToken
Expand All @@ -237,9 +231,7 @@ await _taskManager.UpdateAsync(
pollResponse.WorkflowInstanceId
);
}
catch (OperationCanceledException) when
(cancellationToken
.IsCancellationRequested) // This is fine since we know cancellationToken comes from background service
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) // This is fine since we know cancellationToken comes from background service
{
_logger.LogWarning(
"Cancelling task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) due to background service shutdown",
Expand Down Expand Up @@ -273,8 +265,7 @@ await Task.WhenAll(
TaskId = pollResponse.TaskId,
Status = TaskResultStatus.FAILED,
ReasonForIncompletion = exception.Message,
OutputData = SerializationHelper.ObjectToDictionary(errorMessage,
ConductorConstants.IoJsonSerializerSettings),
OutputData = SerializationHelper.ObjectToDictionary(errorMessage, ConductorConstants.IoJsonSerializerSettings),
WorkflowInstanceId = pollResponse?.WorkflowInstanceId
},
tokenHolder.CancellationToken
Expand All @@ -286,4 +277,4 @@ await Task.WhenAll(
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>3.7.0</Version>
<Version>3.7.1</Version>
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/ConductorSharp.Patterns/ConductorSharp.Patterns.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<Version>3.7.0</Version>
<Version>3.7.1</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading