From a562de829aa1939594aa4dc5ef95ad11e41adaa4 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Tue, 14 Oct 2025 15:04:56 +0200 Subject: [PATCH 1/6] feat: add /:id/complete endpoint --- .../DataPlaneSignalingApiControllerTest.cs | 79 +++++++++++++++++-- .../DataPlaneSignalingApiController.cs | 20 ++++- .../DataPlaneSignalingServiceTest.cs | 51 ++++++++++++ DataPlane.Sdk.Core/DataPlaneSdk.cs | 12 +++ .../Interfaces/IDataPlaneSignalingService.cs | 10 +++ DataPlane.Sdk.Core/Domain/Model/DataFlow.cs | 5 ++ .../DataPlaneSignalingService.cs | 29 +++++++ 7 files changed, 198 insertions(+), 8 deletions(-) rename DataPlane.Sdk.Core.Test/{ => Infrastructure}/DataPlaneSignalingServiceTest.cs (91%) diff --git a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs index 0dc6a40..f091ed9 100644 --- a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs +++ b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs @@ -523,7 +523,7 @@ public async Task Start_WhenSdkReturnsStarting_Success() #endregion - #region StartById + #region Notify-Started [Fact] public async Task StartById_Success() @@ -540,7 +540,7 @@ public async Task StartById_Success() Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.OK); var body = await response.Content.ReadFromJsonAsync(); body.ShouldNotBeNull(); @@ -560,7 +560,7 @@ public async Task StartById_WhenNotFound_ExpectError() Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/not-exist/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/not-exist/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.NotFound); } @@ -580,7 +580,7 @@ public async Task StartById_InvalidState_ExpectConflict() Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.Conflict); } @@ -603,7 +603,7 @@ public async Task StartById_WhenSdkReturnsStarting_Success() Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.Accepted); response.Headers.Location.ShouldNotBeNull() .ToString().ShouldEndWith($"/api/v1/{TestUser}/dataflows/{flow.Id}"); @@ -632,7 +632,7 @@ public async Task StartById_WhenSdkReturnsStarted_Success() Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.OK); var body = await response.Content.ReadFromJsonAsync(); body.ShouldNotBeNull(); @@ -659,7 +659,72 @@ public async Task StartById_SdkHandlerWrongState_ExpectBadRequest() Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); + response.StatusCode.ShouldBe(HttpStatusCode.Conflict); + } + + #endregion + + #region Complete + + [Fact] + public async Task Complete_Success() + { + Sdk.OnComplete = null; + var flow = CreateDataFlow(); + flow.State = DataFlowState.Started; + DataFlowContext.DataFlows.Add(flow); + await DataFlowContext.SaveChangesAsync(); + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/complete", null); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + } + + [Fact] + public async Task Complete_WhenNotFound_ExpectNotFound() + { + Sdk.OnComplete = null; + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/not-exist/complete", null); + response.StatusCode.ShouldBe(HttpStatusCode.NotFound); + } + + [Fact] + public async Task Complete_WithBody_ExpectBadRequest() + { + Sdk.OnComplete = null; + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/some-flow/complete", new StringContent("{\"foo\": \"bar\"}")); + response.StatusCode.ShouldBe(HttpStatusCode.BadRequest); + } + + [Fact] + public async Task Complete_WhenSdkReportsError_ExpectBadRequest() + { + Sdk.OnComplete = df => StatusResult.Failed(new StatusFailure + { + Message = "test error", + Reason = FailureReason.ServiceUnavailable + }); + + var flow = CreateDataFlow(); + flow.State = DataFlowState.Started; + DataFlowContext.DataFlows.Add(flow); + await DataFlowContext.SaveChangesAsync(); + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/complete", null); + response.StatusCode.ShouldBe(HttpStatusCode.ServiceUnavailable); + } + + [Fact] + public async Task Complete_WhenFlowInWrongState_ExpectBadRequest() + { + var flow = CreateDataFlow(); + flow.State = DataFlowState.Terminated; + DataFlowContext.DataFlows.Add(flow); + await DataFlowContext.SaveChangesAsync(); + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/complete", null); response.StatusCode.ShouldBe(HttpStatusCode.Conflict); } diff --git a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs index 56675c8..b0c45a7 100644 --- a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs +++ b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs @@ -82,7 +82,7 @@ public async Task Start([FromRoute] string participantContextId, } [Authorize] - [HttpPost("{dataFlowId}/start")] + [HttpPost("{dataFlowId}/started")] public async Task StartById([FromRoute] string participantContextId, [FromRoute] string dataFlowId, DataFlowStartByIdMessage startMessage) { if (!(await authorizationService.AuthorizeAsync(User, new ResourceTuple(participantContextId, dataFlowId), "DataFlowAccess")).Succeeded) @@ -144,6 +144,24 @@ public async Task Terminate([FromRoute] string participantContext return statusResult.IsSucceeded ? Ok() : StatusCode((int)statusResult.Failure!.Reason, statusResult); } + [Authorize] + [HttpPost("{dataFlowId}/complete")] + public async Task Complete([FromRoute] string participantContextId, [FromRoute] string dataFlowId) + { + if (!(await authorizationService.AuthorizeAsync(User, new ResourceTuple(participantContextId, dataFlowId), "DataFlowAccess")).Succeeded) + { + return Forbid(); + } + + if (Request.ContentLength > 0) + { + return BadRequest("Request body is not allowed for this endpoint"); + } + + var result = await signalingService.CompleteAsync(dataFlowId); + return result.IsSucceeded ? Ok() : StatusCode((int)result.Failure!.Reason, result); + } + [Authorize] [HttpGet("{dataFlowId}/status")] public async Task GetStatus([FromRoute] string dataFlowId, [FromRoute] string participantContextId) diff --git a/DataPlane.Sdk.Core.Test/DataPlaneSignalingServiceTest.cs b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs similarity index 91% rename from DataPlane.Sdk.Core.Test/DataPlaneSignalingServiceTest.cs rename to DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs index ede234f..ce1c194 100644 --- a/DataPlane.Sdk.Core.Test/DataPlaneSignalingServiceTest.cs +++ b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs @@ -522,6 +522,57 @@ public async Task SuspendAsync_ShouldReturnSuccess_WhenAlreadySuspended() result.IsSucceeded.ShouldBeTrue(); eventMock.Verify(ev => ev.Invoke(dataFlow), Times.Never); } + + [Fact] + public async Task CompleteAsync_WhenFound_ShouldReturnSuccess() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Started); + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + var res = await _service.CompleteAsync(flow.Id); + res.IsSucceeded.ShouldBeTrue(); + } + + [Fact] + public async Task CompleteAsync_WhenWrongState_ShouldReturnConflict() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Started); + flow.State = Terminated; + + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + var res = await _service.CompleteAsync(flow.Id); + res.IsSucceeded.ShouldBeFalse(); + res.Failure.ShouldNotBeNull(); + res.Failure.Reason.ShouldBe(Conflict); + } + + [Fact] + public async Task CompleteAsync_WhenSdkReportsError_ShouldReturnError() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Started); + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + _sdk.OnComplete = _ => StatusResult.Failed(new StatusFailure { Message = "test error", Reason = InternalError }); + + var res = await _service.CompleteAsync(flow.Id); + res.IsSucceeded.ShouldBeFalse(); + res.Failure.ShouldNotBeNull(); + res.Failure.Reason.ShouldBe(InternalError); + res.Failure.Message.ShouldBe("test error"); + } + + [Fact] + public async Task CompleteAsync_WhenNotFound_ShouldReturnNotFound() + { + var res = await _service.CompleteAsync("not-exist"); + res.IsSucceeded.ShouldBeFalse(); + res.Failure.ShouldNotBeNull(); + res.Failure.Reason.ShouldBe(NotFound); + } } [CollectionDefinition("SignalingService")] //parallelize tests in this collection diff --git a/DataPlane.Sdk.Core/DataPlaneSdk.cs b/DataPlane.Sdk.Core/DataPlaneSdk.cs index b01699c..9d15f30 100644 --- a/DataPlane.Sdk.Core/DataPlaneSdk.cs +++ b/DataPlane.Sdk.Core/DataPlaneSdk.cs @@ -8,6 +8,7 @@ namespace DataPlane.Sdk.Core; public class DataPlaneSdk { + public Func? OnComplete; public Func>? OnPrepare; public Func>? OnStart; public Func? OnSuspend; @@ -50,6 +51,11 @@ internal StatusResult InvokeOnPrepare(DataFlow flow) return StatusResult.Success(flow); } + internal StatusResult InvokeOnComplete(DataFlow flow) + { + return OnComplete != null ? OnComplete(flow) : StatusResult.Success(); + } + public class SdkBuilder { private readonly DataPlaneSdk _dataPlaneSdk = new() @@ -86,6 +92,12 @@ public SdkBuilder OnSuspend(Func processor) return this; } + public SdkBuilder OnComplete(Func processor) + { + _dataPlaneSdk.OnComplete = processor; + return this; + } + public SdkBuilder RuntimeId(string runtimeId) { _dataPlaneSdk.RuntimeId = runtimeId; diff --git a/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs index a844371..9c556dc 100644 --- a/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs @@ -46,4 +46,14 @@ public interface IDataPlaneSignalingService /// preparation to happen asynchronously. If the state is PREPARED, then the caller can proceed normally. /// Task> PrepareAsync(DataFlowPrepareMessage prepareMessage); + + /// + /// Marks a data flow as completed. + /// + /// The ID of the data flow to complete + /// + /// A status result indicating success or failure. Failure may include specific details + /// such as wrong state, not found, or other error conditions. + /// + Task CompleteAsync(string dataFlowId); } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs index bc7e8f5..06c900c 100644 --- a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs +++ b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs @@ -46,4 +46,9 @@ public void Starting() { Transition(DataFlowState.Starting); } + + public void Complete() + { + Transition(DataFlowState.Completed); + } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs index 5c19bab..beeafda 100644 --- a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs @@ -144,6 +144,35 @@ public async Task> PrepareAsync(DataFlowPrepareMessage pr return StatusResult.Success(updatedFlow); } + public async Task CompleteAsync(string dataFlowId) + { + var existingFlowResult = await dataFlowContext.FindByIdAsync(dataFlowId); + if (existingFlowResult == null) + { + return StatusResult.NotFound(); + } + + if (existingFlowResult.State == DataFlowState.Completed) // de-duplication check + { + return StatusResult.Success(); + } + + if (existingFlowResult.State is DataFlowState.Started) + { + var res = sdk.InvokeOnComplete(existingFlowResult); + if (res.IsFailed) + { + return res; + } + + existingFlowResult.Complete(); + await dataFlowContext.UpsertAsync(existingFlowResult, true); + return StatusResult.Success(); + } + + return StatusResult.Conflict("DataFlow is not in started state, cannot complete."); + } + private async Task> StartExistingFlow(DataFlow existingFlow, Func> sdkHandler) { // invoke SDK handler From bbd813180cfcc4435b7e2f2ab7196d07ec91dd16 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 07:35:50 +0200 Subject: [PATCH 2/6] complete -> completed --- .../DataPlaneSignalingApiControllerTest.cs | 10 +++++----- .../Controllers/DataPlaneSignalingApiController.cs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs index f091ed9..4941712 100644 --- a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs +++ b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs @@ -676,7 +676,7 @@ public async Task Complete_Success() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/complete", null); + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/completed", null); response.StatusCode.ShouldBe(HttpStatusCode.OK); } @@ -685,7 +685,7 @@ public async Task Complete_WhenNotFound_ExpectNotFound() { Sdk.OnComplete = null; - var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/not-exist/complete", null); + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/not-exist/completed", null); response.StatusCode.ShouldBe(HttpStatusCode.NotFound); } @@ -694,7 +694,7 @@ public async Task Complete_WithBody_ExpectBadRequest() { Sdk.OnComplete = null; - var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/some-flow/complete", new StringContent("{\"foo\": \"bar\"}")); + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/some-flow/completed", new StringContent("{\"foo\": \"bar\"}")); response.StatusCode.ShouldBe(HttpStatusCode.BadRequest); } @@ -712,7 +712,7 @@ public async Task Complete_WhenSdkReportsError_ExpectBadRequest() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/complete", null); + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/completed", null); response.StatusCode.ShouldBe(HttpStatusCode.ServiceUnavailable); } @@ -724,7 +724,7 @@ public async Task Complete_WhenFlowInWrongState_ExpectBadRequest() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/complete", null); + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/completed", null); response.StatusCode.ShouldBe(HttpStatusCode.Conflict); } diff --git a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs index b0c45a7..7c95c99 100644 --- a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs +++ b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs @@ -145,7 +145,7 @@ public async Task Terminate([FromRoute] string participantContext } [Authorize] - [HttpPost("{dataFlowId}/complete")] + [HttpPost("{dataFlowId}/completed")] public async Task Complete([FromRoute] string participantContextId, [FromRoute] string dataFlowId) { if (!(await authorizationService.AuthorizeAsync(User, new ResourceTuple(participantContextId, dataFlowId), "DataFlowAccess")).Succeeded) From d5828e346a8daf39711fe6280f97fcee6e230d69 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 07:49:38 +0200 Subject: [PATCH 3/6] startById is only valid for consumer DFs --- .../DataPlaneSignalingApiControllerTest.cs | 3 +++ .../DataPlaneSignalingServiceTest.cs | 25 ++++++++++++++++++- DataPlane.Sdk.Core/Domain/Model/DataFlow.cs | 2 +- .../DataPlaneSignalingService.cs | 5 ++++ 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs index 4941712..8a52657 100644 --- a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs +++ b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs @@ -530,6 +530,7 @@ public async Task StartById_Success() { Sdk.OnStart = null; var flow = CreateDataFlow(); + flow.IsConsumer = true; DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); @@ -593,6 +594,7 @@ public async Task StartById_WhenSdkReturnsStarting_Success() return StatusResult.Success(df); }; var flow = CreateDataFlow(); + flow.IsConsumer = true; DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); @@ -622,6 +624,7 @@ public async Task StartById_WhenSdkReturnsStarted_Success() return StatusResult.Success(df); }; var flow = CreateDataFlow(); + flow.IsConsumer = true; DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); diff --git a/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs index ce1c194..44c4042 100644 --- a/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs +++ b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs @@ -12,7 +12,7 @@ [assembly: CollectionBehavior(MaxParallelThreads = 1)] -namespace DataPlane.Sdk.Core.Test; +namespace DataPlane.Sdk.Core.Test.Infrastructure; public abstract class DataPlaneSignalingServiceTest : IDisposable { @@ -149,6 +149,7 @@ public async Task StartAsync_WhenDataFlowIsLeased_ShouldReturnFailure() public async Task StartByIdAsync_WhenExists_ShouldReturnSuccess() { var flow = CreateDataFlow(Guid.NewGuid().ToString(), Uninitialized); + flow.IsConsumer = true; await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); @@ -181,6 +182,28 @@ public async Task StartByIdAsync_WhenNotExists_ShouldReturnFailure() result.Failure.Reason.ShouldBe(NotFound); } + [Fact] + public async Task StartByIdAsync_WhenNotConsumer_ShouldReturnFailure() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Uninitialized); + flow.IsConsumer = false; + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + var msg = new DataFlowStartByIdMessage + { + SourceDataAddress = new DataAddress("test-type") + { + Properties = { ["key1"] = "value1" } + } + }; + + var result = await _service.StartByIdAsync(flow.Id, msg); + result.IsFailed.ShouldBeTrue(); + result.Failure.ShouldNotBeNull(); + result.Failure.Reason.ShouldBe(Conflict); + } + [Fact] public async Task StartByIdAsync_WhenWrongState_ShouldReturnFailure() { diff --git a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs index 06c900c..3dc7e92 100644 --- a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs +++ b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs @@ -13,7 +13,7 @@ public class DataFlow(string id) : StatefulEntity(id) public bool IsProvisionRequested { get; init; } public bool IsDeprovisionComplete { get; init; } public bool IsDeprovisionRequested { get; init; } - public bool IsConsumer { get; init; } + public bool IsConsumer { get; set; } public required string ParticipantId { get; init; } public required string AssetId { get; init; } public required string AgreementId { get; init; } diff --git a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs index beeafda..fab7467 100644 --- a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs @@ -45,6 +45,11 @@ public async Task> StartByIdAsync(string id, DataFlowStar return StatusResult.Success(existing); } + if (!existing.IsConsumer) + { + return StatusResult.Conflict("This request is only valid for DataFlows on the consumer side."); + } + // check the correct state of the existing DF if (existing.State is DataFlowState.Starting or DataFlowState.Prepared or DataFlowState.Uninitialized) { From 5b0aa8f3bee3e42b23b94cb0dfaa0eee786a0009 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 08:38:24 +0200 Subject: [PATCH 4/6] update source/dest -> dataAddress --- .../DataPlaneSignalingApiControllerTest.cs | 58 ++++++--------- .../DataPlaneSignalingApiController.cs | 3 +- ...DataFlowPrepareMessageSerializationTest.cs | 18 ++--- .../Messages/DataFlowStartByIdMessageTest.cs | 30 ++++---- .../DataFlowStartMessageSerializationTest.cs | 74 +++++-------------- .../DataPlaneSignalingServiceTest.cs | 20 ++--- DataPlane.Sdk.Core.Test/TestMethods.cs | 5 +- .../Interfaces/IDataPlaneSignalingService.cs | 2 +- .../Domain/Messages/DataFlowBaseMessage.cs | 4 +- .../Domain/Messages/DataFlowStartMessage.cs | 5 -- ... => DataFlowStartedNotificationMessage.cs} | 6 +- .../DataPlaneSignalingService.cs | 9 ++- 12 files changed, 90 insertions(+), 144 deletions(-) rename DataPlane.Sdk.Core/Domain/Messages/{DataFlowStartByIdMessage.cs => DataFlowStartedNotificationMessage.cs} (71%) diff --git a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs index 8a52657..c60a7b1 100644 --- a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs +++ b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs @@ -143,7 +143,7 @@ public async Task Prepare_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -174,7 +174,7 @@ public async Task Prepare_WhenReturnsSync_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -205,7 +205,7 @@ public async Task Prepare_WhenReturnsAsync_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -238,7 +238,7 @@ public async Task Prepare_WhenSdkReturnsInvalidState_Expect400() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -263,7 +263,7 @@ public async Task Prepare_WhenDataflowExists_Expect409() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -359,11 +359,7 @@ public async Task Start_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type") - { - Properties = { ["test-key"] = "test-value" } - }, - DestinationDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } }, @@ -396,11 +392,7 @@ public async Task Start_WhenAlreadyExists_ExpectConflict() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type") - { - Properties = { ["test-key"] = "test-value" } - }, - DestinationDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } }, @@ -432,11 +424,7 @@ public async Task Start_SdkHandlerWrongState_ExpectBadRequest() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type") - { - Properties = { ["test-key"] = "test-value" } - }, - DestinationDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } }, @@ -470,8 +458,7 @@ public async Task Start_WhenSdkReturnsStarted_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type"), - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -501,8 +488,7 @@ public async Task Start_WhenSdkReturnsStarting_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type"), - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -534,9 +520,9 @@ public async Task StartById_Success() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } @@ -554,9 +540,9 @@ public async Task StartById_WhenNotFound_ExpectError() { Sdk.OnStart = null; - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } @@ -574,9 +560,9 @@ public async Task StartById_InvalidState_ExpectConflict() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } @@ -598,9 +584,9 @@ public async Task StartById_WhenSdkReturnsStarting_Success() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } @@ -628,9 +614,9 @@ public async Task StartById_WhenSdkReturnsStarted_Success() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } @@ -655,9 +641,9 @@ public async Task StartById_SdkHandlerWrongState_ExpectBadRequest() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } diff --git a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs index 7c95c99..1f92d4c 100644 --- a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs +++ b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs @@ -83,7 +83,8 @@ public async Task Start([FromRoute] string participantContextId, [Authorize] [HttpPost("{dataFlowId}/started")] - public async Task StartById([FromRoute] string participantContextId, [FromRoute] string dataFlowId, DataFlowStartByIdMessage startMessage) + public async Task StartById([FromRoute] string participantContextId, [FromRoute] string dataFlowId, + DataFlowStartedNotificationMessage startMessage) { if (!(await authorizationService.AuthorizeAsync(User, new ResourceTuple(participantContextId, dataFlowId), "DataFlowAccess")).Succeeded) { diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs index 70fea73..e156b0d 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs @@ -21,7 +21,7 @@ public void Serialize_WithAllProperties_Success() DataspaceContext = "dataspace-context", AgreementId = "agreement-def", CallbackAddress = new Uri("https://callback.example.com"), - DestinationDataAddress = new DataAddress("AzureBlob") + DataAddress = new DataAddress("AzureBlob") { Properties = { ["container"] = "dest-container", ["account"] = "myaccount" } }, @@ -43,7 +43,7 @@ public void Serialize_WithAllProperties_Success() json.ShouldContain("\"participantID\":\"participant-abc\""); json.ShouldContain("\"counterPartyID\":\"counterparty-xyz\""); json.ShouldContain("\"agreementID\":\"agreement-def\""); - json.ShouldContain("\"destinationDataAddress\""); + json.ShouldContain("\"dataAddress\""); json.ShouldContain("\"transferType\""); } @@ -61,7 +61,7 @@ public void Deserialize_WithAllProperties_Success() "dataspaceContext": "dataspace-context", "agreementID": "agreement-def", "callbackAddress": "https://callback.example.com", - "destinationDataAddress": { + "dataAddress": { "@type": "AzureBlob", "properties": { "container": "dest-container", @@ -89,8 +89,8 @@ public void Deserialize_WithAllProperties_Success() message.AgreementId.ShouldBe("agreement-def"); message.CallbackAddress.ShouldNotBeNull(); message.CallbackAddress.ToString().ShouldBe("https://callback.example.com/"); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Type.ShouldBe("AzureBlob"); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("AzureBlob"); message.TransferType.ShouldNotBeNull(); message.TransferType.DestinationType.ShouldBe("AzureBlob"); message.TransferType.FlowType.ShouldBe(FlowType.Push); @@ -106,7 +106,7 @@ public void Deserialize_WithMinimalProperties_Success() "datasetID": "dataset-789", "participantID": "participant-abc", "agreementID": "agreement-def", - "destinationDataAddress": { + "dataAddress": { "type": "HttpData" }, "transferType": { @@ -125,7 +125,7 @@ public void Deserialize_WithMinimalProperties_Success() message.DatasetId.ShouldBe("dataset-789"); message.ParticipantId.ShouldBe("participant-abc"); message.AgreementId.ShouldBe("agreement-def"); - message.DestinationDataAddress.ShouldNotBeNull(); + message.DataAddress.ShouldNotBeNull(); message.TransferType.ShouldNotBeNull(); message.TransferType.FlowType.ShouldBe(FlowType.Pull); } @@ -141,7 +141,7 @@ public void SerializeDeserialize_RoundTrip_Success() DatasetId = "dataset-roundtrip", ParticipantId = "participant-roundtrip", AgreementId = "agreement-roundtrip", - DestinationDataAddress = new DataAddress("Database") + DataAddress = new DataAddress("Database") { Properties = { ["connectionString"] = "Server=localhost", ["table"] = "dest_table" } }, @@ -163,7 +163,7 @@ public void SerializeDeserialize_RoundTrip_Success() deserialized.DatasetId.ShouldBe(original.DatasetId); deserialized.ParticipantId.ShouldBe(original.ParticipantId); deserialized.AgreementId.ShouldBe(original.AgreementId); - deserialized.DestinationDataAddress.Type.ShouldBe(original.DestinationDataAddress.Type); + deserialized.DataAddress.Type.ShouldBe(original.DataAddress.Type); deserialized.TransferType.DestinationType.ShouldBe(original.TransferType.DestinationType); deserialized.TransferType.FlowType.ShouldBe(original.TransferType.FlowType); } diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs index 2a6faa7..94abf38 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs @@ -6,7 +6,7 @@ namespace DataPlane.Sdk.Core.Test.Domain.Messages; /// -/// Tests for JSON serialization and deserialization of DataFlowStartByIdMessage +/// Tests for JSON serialization and deserialization of DataFlowStartedNotificationMessage /// public class DataFlowStartByIdMessageTest { @@ -14,9 +14,9 @@ public class DataFlowStartByIdMessageTest public void SerDes_WithSourceDataAddress_Success() { // Arrange - var message = new DataFlowStartByIdMessage + var message = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("S3") + DataAddress = new DataAddress("S3") { Properties = { ["bucketName"] = "source-bucket", ["region"] = "us-east-1" } } @@ -27,20 +27,20 @@ public void SerDes_WithSourceDataAddress_Success() // Assert json.ShouldNotBeNullOrWhiteSpace(); - json.ShouldContain("\"sourceDataAddress\""); + json.ShouldContain("\"dataAddress\""); json.ShouldContain("\"@type\":\"S3\""); json.ShouldContain("\"bucketName\":\"source-bucket\""); json.ShouldContain("\"region\":\"us-east-1\""); // Act - var deserialized = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); + var deserialized = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); // Assert deserialized.ShouldNotBeNull(); - deserialized.SourceDataAddress.ShouldNotBeNull(); - deserialized.SourceDataAddress.Type.ShouldBe("S3"); - deserialized.SourceDataAddress.Properties["bucketName"].ShouldBeEquivalentTo("source-bucket"); - deserialized.SourceDataAddress.Properties["region"].ShouldBeEquivalentTo("us-east-1"); + deserialized.DataAddress.ShouldNotBeNull(); + deserialized.DataAddress.Type.ShouldBe("S3"); + deserialized.DataAddress.Properties["bucketName"].ShouldBeEquivalentTo("source-bucket"); + deserialized.DataAddress.Properties["region"].ShouldBeEquivalentTo("us-east-1"); deserialized.ShouldBeEquivalentTo(message); } @@ -51,20 +51,20 @@ public void SerDes_WithEmptySourceDataAddressProperties_Success() // Arrange var json = """ { - "sourceDataAddress": { + "dataAddress": { "@type": "HttpData" } } """; // Act - var message = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); + var message = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); // Assert message.ShouldNotBeNull(); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Type.ShouldBe("HttpData"); - message.SourceDataAddress.Properties.ShouldBeEmpty(); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("HttpData"); + message.DataAddress.Properties.ShouldBeEmpty(); } [Fact] @@ -77,6 +77,6 @@ public void SerializeDeserialize_NoSourceDataAddress_Failure() """; // Act - Should.Throw(() => JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions)); + Should.Throw(() => JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions)); } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs index 4e08181..842cc94 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs @@ -21,11 +21,7 @@ public void Serialize_WithAllProperties_Success() DatasetId = "dataset-789", ParticipantId = "participant-abc", AgreementId = "agreement-def", - SourceDataAddress = new DataAddress("S3") - { - Properties = { ["bucketName"] = "source-bucket", ["region"] = "us-east-1" } - }, - DestinationDataAddress = new DataAddress("AzureBlob") + DataAddress = new DataAddress("AzureBlob") { Properties = { ["container"] = "dest-container", ["account"] = "myaccount" } }, @@ -46,8 +42,7 @@ public void Serialize_WithAllProperties_Success() json.ShouldContain("\"datasetId\":\"dataset-789\""); json.ShouldContain("\"participantId\":\"participant-abc\""); json.ShouldContain("\"agreementId\":\"agreement-def\""); - json.ShouldContain("\"sourceDataAddress\""); - json.ShouldContain("\"destinationDataAddress\""); + json.ShouldContain("\"dataAddress\""); } [Fact] @@ -61,14 +56,7 @@ public void Deserialize_WithAllProperties_Success() "datasetID": "dataset-789", "participantID": "participant-abc", "agreementID": "agreement-def", - "sourceDataAddress": { - "@type": "S3", - "properties": { - "bucketName": "source-bucket", - "region": "us-east-1" - } - }, - "destinationDataAddress": { + "dataAddress": { "@type": "AzureBlob", "properties": { "container": "dest-container", @@ -92,14 +80,10 @@ public void Deserialize_WithAllProperties_Success() message.DatasetId.ShouldBe("dataset-789"); message.ParticipantId.ShouldBe("participant-abc"); message.AgreementId.ShouldBe("agreement-def"); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Type.ShouldBe("S3"); - message.SourceDataAddress.Properties["bucketName"].ShouldBeEquivalentTo("source-bucket"); - message.SourceDataAddress.Properties["region"].ShouldBeEquivalentTo("us-east-1"); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Type.ShouldBe("AzureBlob"); - message.DestinationDataAddress.Properties["container"].ShouldBeEquivalentTo("dest-container"); - message.DestinationDataAddress.Properties["account"].ShouldBeEquivalentTo("myaccount"); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("AzureBlob"); + message.DataAddress.Properties["container"].ShouldBeEquivalentTo("dest-container"); + message.DataAddress.Properties["account"].ShouldBeEquivalentTo("myaccount"); message.TransferType.FlowType.ShouldBe(FlowType.Push); } @@ -113,10 +97,7 @@ public void Deserialize_WithMinimalProperties_Success() "datasetID": "dataset-789", "participantID": "participant-abc", "agreementID": "agreement-def", - "sourceDataAddress": { - "@type": "HttpData" - }, - "destinationDataAddress": { + "dataAddress": { "@type": "HttpData" }, "transferType": { @@ -135,10 +116,8 @@ public void Deserialize_WithMinimalProperties_Success() message.DatasetId.ShouldBe("dataset-789"); message.ParticipantId.ShouldBe("participant-abc"); message.AgreementId.ShouldBe("agreement-def"); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Type.ShouldBe("HttpData"); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Type.ShouldBe("HttpData"); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("HttpData"); message.TransferType.FlowType.ShouldBe(FlowType.Pull); } @@ -153,11 +132,7 @@ public void SerializeDeserialize_RoundTrip_Success() DatasetId = "dataset-roundtrip", ParticipantId = "participant-roundtrip", AgreementId = "agreement-roundtrip", - SourceDataAddress = new DataAddress("FileSystem") - { - Properties = { ["path"] = "/data/source", ["format"] = "csv" } - }, - DestinationDataAddress = new DataAddress("Database") + DataAddress = new DataAddress("Database") { Properties = { ["connectionString"] = "Server=localhost", ["table"] = "dest_table" } }, @@ -179,12 +154,9 @@ public void SerializeDeserialize_RoundTrip_Success() deserialized.DatasetId.ShouldBe(original.DatasetId); deserialized.ParticipantId.ShouldBe(original.ParticipantId); deserialized.AgreementId.ShouldBe(original.AgreementId); - deserialized.SourceDataAddress?.Type.ShouldBe(original.SourceDataAddress.Type); - deserialized.SourceDataAddress?.Properties["path"].ShouldBeEquivalentTo("/data/source"); - deserialized.SourceDataAddress?.Properties["format"].ShouldBeEquivalentTo("csv"); - deserialized.DestinationDataAddress.Type.ShouldBe(original.DestinationDataAddress.Type); - deserialized.DestinationDataAddress.Properties["connectionString"].ShouldBeEquivalentTo("Server=localhost"); - deserialized.DestinationDataAddress.Properties["table"].ShouldBeEquivalentTo("dest_table"); + deserialized.DataAddress.Type.ShouldBe(original.DataAddress.Type); + deserialized.DataAddress.Properties["connectionString"].ShouldBeEquivalentTo("Server=localhost"); + deserialized.DataAddress.Properties["table"].ShouldBeEquivalentTo("dest_table"); deserialized.TransferType.ShouldBeEquivalentTo(original.TransferType); } @@ -198,11 +170,7 @@ public void Deserialize_WithEmptyDataAddressProperties_Success() "datasetID": "dataset-123", "participantID": "participant-123", "agreementID": "agreement-123", - "sourceDataAddress": { - "type": "Custom", - "properties": {} - }, - "destinationDataAddress": { + "dataAddress": { "type": "Custom", "properties": {} }, @@ -218,10 +186,8 @@ public void Deserialize_WithEmptyDataAddressProperties_Success() // Assert message.ShouldNotBeNull(); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Properties.ShouldBeEmpty(); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Properties.ShouldBeEmpty(); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Properties.ShouldBeEmpty(); } [Fact] @@ -234,8 +200,7 @@ public void Serialize_WithCamelCasePropertyNames_Success() DatasetId = "dataset-123", ParticipantId = "participant-123", AgreementId = "agreement-123", - SourceDataAddress = new DataAddress("TestType"), - DestinationDataAddress = new DataAddress("TestType"), + DataAddress = new DataAddress("TestType"), TransferType = new TransferType { DestinationType = "test-type", @@ -251,8 +216,7 @@ public void Serialize_WithCamelCasePropertyNames_Success() json.ShouldContain("\"datasetId\""); json.ShouldContain("\"participantId\""); json.ShouldContain("\"agreementId\""); - json.ShouldContain("\"sourceDataAddress\""); - json.ShouldContain("\"destinationDataAddress\""); + json.ShouldContain("\"dataAddress\""); json.ShouldContain("\"transferType\""); } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs index 44c4042..84e14cc 100644 --- a/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs +++ b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs @@ -153,9 +153,9 @@ public async Task StartByIdAsync_WhenExists_ShouldReturnSuccess() await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -168,9 +168,9 @@ public async Task StartByIdAsync_WhenExists_ShouldReturnSuccess() [Fact] public async Task StartByIdAsync_WhenNotExists_ShouldReturnFailure() { - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -190,9 +190,9 @@ public async Task StartByIdAsync_WhenNotConsumer_ShouldReturnFailure() await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -211,9 +211,9 @@ public async Task StartByIdAsync_WhenWrongState_ShouldReturnFailure() await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -232,9 +232,9 @@ public async Task StartByIdAsync_WhenAlreadyStarted_ShouldReturnSuccess() await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } diff --git a/DataPlane.Sdk.Core.Test/TestMethods.cs b/DataPlane.Sdk.Core.Test/TestMethods.cs index b59a7b8..0bb23fb 100644 --- a/DataPlane.Sdk.Core.Test/TestMethods.cs +++ b/DataPlane.Sdk.Core.Test/TestMethods.cs @@ -32,8 +32,7 @@ public static DataFlowStartMessage CreateStartMessage() return new DataFlowStartMessage { ProcessId = "test-process-id", - SourceDataAddress = new DataAddress("test-source-type"), - DestinationDataAddress = new DataAddress("test-destination-type"), + DataAddress = new DataAddress("test-destination-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -50,7 +49,7 @@ public static DataFlowPrepareMessage CreatePrepareMessage() return new DataFlowPrepareMessage { ProcessId = "test-process-id", - DestinationDataAddress = new DataAddress("test-destination-type"), + DataAddress = new DataAddress("test-destination-type"), TransferType = new TransferType { DestinationType = "test-type", diff --git a/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs index 9c556dc..51f1105 100644 --- a/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs @@ -17,7 +17,7 @@ public interface IDataPlaneSignalingService /// /// /// The start message - Task> StartByIdAsync(string id, DataFlowStartByIdMessage message); + Task> StartByIdAsync(string id, DataFlowStartedNotificationMessage message); /// /// Suspends (pauses) a data flow by its ID. diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs index 543c978..9d6b03f 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs @@ -32,6 +32,6 @@ public abstract class DataFlowBaseMessage : JsonLdDto [JsonPropertyName("transferType")] public required TransferType TransferType { get; init; } - [JsonPropertyName("destinationDataAddress")] - public required DataAddress DestinationDataAddress { get; init; } + [JsonPropertyName("dataAddress")] + public required DataAddress DataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs index 10f6699..04ea02d 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs @@ -1,6 +1,3 @@ -using System.Text.Json.Serialization; -using DataPlane.Sdk.Core.Domain.Model; - namespace DataPlane.Sdk.Core.Domain.Messages; /// @@ -10,6 +7,4 @@ namespace DataPlane.Sdk.Core.Domain.Messages; /// public class DataFlowStartMessage : DataFlowBaseMessage { - [JsonPropertyName("sourceDataAddress")] - public DataAddress? SourceDataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartByIdMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs similarity index 71% rename from DataPlane.Sdk.Core/Domain/Messages/DataFlowStartByIdMessage.cs rename to DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs index 9dc91a8..1755238 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartByIdMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs @@ -7,8 +7,8 @@ namespace DataPlane.Sdk.Core.Domain.Messages; /// Represents a data flow start message from the Dataplane Signaling API protocol. It is used to initiate a data /// transfer between a consumer and the provider. This message is sent by the control plane to the data plane. /// -public class DataFlowStartByIdMessage : JsonLdDto +public class DataFlowStartedNotificationMessage : JsonLdDto { - [JsonPropertyName("sourceDataAddress")] - public required DataAddress SourceDataAddress { get; init; } + [JsonPropertyName("dataAddress")] + public required DataAddress DataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs index fab7467..ce7317b 100644 --- a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs @@ -31,7 +31,7 @@ public async Task> StartAsync(DataFlowStartMessage messag } - public async Task> StartByIdAsync(string id, DataFlowStartByIdMessage message) + public async Task> StartByIdAsync(string id, DataFlowStartedNotificationMessage message) { var existing = await dataFlowContext.FindByIdAsync(id); @@ -205,7 +205,7 @@ private DataFlow CreateDataFlow(DataFlowPrepareMessage message) { return new DataFlow(message.ProcessId) { - Destination = message.DestinationDataAddress, + Destination = message.DataAddress, TransferType = message.TransferType, RuntimeId = _runtimeId, ParticipantId = message.ParticipantId, @@ -220,8 +220,9 @@ private DataFlow CreateDataFlow(DataFlowStartMessage message) { return new DataFlow(message.ProcessId) { - Source = message.SourceDataAddress, - Destination = message.DestinationDataAddress, + Source = new DataAddress( + "TODO: CHANGE"), //todo: this is incorrect: the source address must be resolved externally from the asset-to-source mapping + Destination = message.DataAddress, TransferType = message.TransferType, RuntimeId = _runtimeId, ParticipantId = message.ParticipantId, From 1447cee076ea2466b8b71e0d9b5c34e899b77187 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 08:47:54 +0200 Subject: [PATCH 5/6] made dataAddress optional --- .../Domain/Messages/DataFlowStartByIdMessageTest.cs | 4 ++-- .../Infrastructure/DataPlaneSignalingServiceTest.cs | 5 ----- .../Domain/Messages/DataFlowStartedNotificationMessage.cs | 2 +- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs index 94abf38..09b57d9 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs @@ -68,7 +68,7 @@ public void SerDes_WithEmptySourceDataAddressProperties_Success() } [Fact] - public void SerializeDeserialize_NoSourceDataAddress_Failure() + public void SerializeDeserialize_NoDataAddress_Success() { // Arrange var json = """ @@ -77,6 +77,6 @@ public void SerializeDeserialize_NoSourceDataAddress_Failure() """; // Act - Should.Throw(() => JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions)); + Should.NotThrow(() => JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions)); } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs index 84e14cc..6048886 100644 --- a/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs +++ b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs @@ -92,11 +92,6 @@ public async Task StartAsync_WhenDataFlowExists_ShouldReturnConflict() result.Failure.ShouldNotBeNull(); result.Failure.Reason.ShouldBe(Conflict); - // result.Content.ShouldSatisfyAllConditions(() => result.Content!.Source.ShouldNotBeNull()); - // result.Content.ShouldSatisfyAllConditions(() => result.Content!.Destination.ShouldNotBeNull()); - - // _dataFlowContext.ChangeTracker.HasChanges().ShouldBeFalse(); - // _dataFlowContext.DataFlows.ShouldContain(x => x.Id == message.ProcessId && x.State == Started); } [Fact] diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs index 1755238..4770239 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs @@ -10,5 +10,5 @@ namespace DataPlane.Sdk.Core.Domain.Messages; public class DataFlowStartedNotificationMessage : JsonLdDto { [JsonPropertyName("dataAddress")] - public required DataAddress DataAddress { get; init; } + public DataAddress? DataAddress { get; init; } } \ No newline at end of file From 1875ff180603c243c03f8248fe44f313a09741e1 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 09:34:45 +0200 Subject: [PATCH 6/6] made dataAddress optional --- DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs | 2 +- .../Messages/DataFlowPrepareMessageSerializationTest.cs | 2 +- .../Messages/DataFlowStartMessageSerializationTest.cs | 6 +++--- DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs | 2 +- DataPlane.Sdk.Core/Domain/Model/DataFlow.cs | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs b/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs index 1bf821f..a869f20 100644 --- a/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs +++ b/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs @@ -22,7 +22,7 @@ public async Task SaveAsync_ShouldAddNewDataFlow() entry.Entity.Id.ShouldBe(dataFlow.Id); entry.Entity.ShouldBeEquivalentTo(dataFlow); - entry.Entity.Destination.Properties["test-key"].ShouldBeEquivalentTo("test-value"); + entry.Entity.Destination?.Properties["test-key"].ShouldBeEquivalentTo("test-value"); } [Fact] diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs index e156b0d..d67e846 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs @@ -163,7 +163,7 @@ public void SerializeDeserialize_RoundTrip_Success() deserialized.DatasetId.ShouldBe(original.DatasetId); deserialized.ParticipantId.ShouldBe(original.ParticipantId); deserialized.AgreementId.ShouldBe(original.AgreementId); - deserialized.DataAddress.Type.ShouldBe(original.DataAddress.Type); + deserialized.DataAddress?.Type.ShouldBe(original.DataAddress.Type); deserialized.TransferType.DestinationType.ShouldBe(original.TransferType.DestinationType); deserialized.TransferType.FlowType.ShouldBe(original.TransferType.FlowType); } diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs index 842cc94..dc950af 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs @@ -154,9 +154,9 @@ public void SerializeDeserialize_RoundTrip_Success() deserialized.DatasetId.ShouldBe(original.DatasetId); deserialized.ParticipantId.ShouldBe(original.ParticipantId); deserialized.AgreementId.ShouldBe(original.AgreementId); - deserialized.DataAddress.Type.ShouldBe(original.DataAddress.Type); - deserialized.DataAddress.Properties["connectionString"].ShouldBeEquivalentTo("Server=localhost"); - deserialized.DataAddress.Properties["table"].ShouldBeEquivalentTo("dest_table"); + deserialized.DataAddress?.Type.ShouldBe(original.DataAddress.Type); + deserialized.DataAddress?.Properties["connectionString"].ShouldBeEquivalentTo("Server=localhost"); + deserialized.DataAddress?.Properties["table"].ShouldBeEquivalentTo("dest_table"); deserialized.TransferType.ShouldBeEquivalentTo(original.TransferType); } diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs index 9d6b03f..5f76753 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs @@ -33,5 +33,5 @@ public abstract class DataFlowBaseMessage : JsonLdDto public required TransferType TransferType { get; init; } [JsonPropertyName("dataAddress")] - public required DataAddress DataAddress { get; init; } + public DataAddress? DataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs index 3dc7e92..571c235 100644 --- a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs +++ b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs @@ -3,7 +3,7 @@ namespace DataPlane.Sdk.Core.Domain.Model; public class DataFlow(string id) : StatefulEntity(id) { public DataAddress? Source { get; set; } - public required DataAddress Destination { get; set; } + public required DataAddress? Destination { get; set; } public Uri? CallbackAddress { get; init; } public IDictionary Properties { get; init; } = new Dictionary();