From e73743def2f65fe4cb492a09407fa0902e6e5288 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 07:43:21 +0200 Subject: [PATCH 1/5] /start -> /started --- examples/common/common.go | 2 +- internal/tests/api_integration_test.go | 10 +++++----- pkg/dsdk/messages.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/common/common.go b/examples/common/common.go index 796b4fb..90ac968 100644 --- a/examples/common/common.go +++ b/examples/common/common.go @@ -40,7 +40,7 @@ type TokenResponse struct { func NewSignalingServer(sdkApi *dsdk.DataPlaneApi, port int) *http.Server { r := chi.NewRouter() r.Post("/dataflows/start", sdkApi.Start) - r.Post("/dataflows/{id}/start", func(writer http.ResponseWriter, request *http.Request) { + r.Post("/dataflows/{id}/started", func(writer http.ResponseWriter, request *http.Request) { id := chi.URLParam(request, "id") sdkApi.StartById(writer, request, id) }) diff --git a/internal/tests/api_integration_test.go b/internal/tests/api_integration_test.go index 6194940..29ad17c 100644 --- a/internal/tests/api_integration_test.go +++ b/internal/tests/api_integration_test.go @@ -35,7 +35,7 @@ func newServerWithSdk(t *testing.T, sdk *dsdk.DataPlaneSDK) http.Handler { r := chi.NewRouter() r.Post("/dataflows/start", sdkApi.Start) - r.Post("/dataflows/{id}/start", func(writer http.ResponseWriter, request *http.Request) { + r.Post("/dataflows/{id}/started", func(writer http.ResponseWriter, request *http.Request) { id := chi.URLParam(request, "id") sdkApi.StartById(writer, request, id) }) @@ -111,7 +111,7 @@ func Test_StartByID_WhenNotFound(t *testing.T) { requestBody, err := serialize(newStartByIdMessage()) assert.NoError(t, err) - req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/start", bytes.NewBuffer(requestBody)) + req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/started", bytes.NewBuffer(requestBody)) assert.NoError(t, err) rr := httptest.NewRecorder() @@ -138,7 +138,7 @@ func Test_StartByID_WhenStartedOrStarting(t *testing.T) { requestBody, err := serialize(newStartByIdMessage()) assert.NoError(t, err) - req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/start", bytes.NewBuffer(requestBody)) + req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/started", bytes.NewBuffer(requestBody)) assert.NoError(t, err) rr := httptest.NewRecorder() @@ -181,7 +181,7 @@ func Test_StartByID_WhenPrepared(t *testing.T) { requestBody, err := serialize(newStartByIdMessage()) assert.NoError(t, err) - req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/start", bytes.NewBuffer(requestBody)) + req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/started", bytes.NewBuffer(requestBody)) assert.NoError(t, err) rr := httptest.NewRecorder() @@ -199,7 +199,7 @@ func Test_StartByID_MissingSourceAddress(t *testing.T) { requestBody, err := serialize(dsdk.DataFlowStartByIdMessage{}) assert.NoError(t, err) - req, err := http.NewRequest(http.MethodPost, "/dataflows/some-id/start", bytes.NewBuffer(requestBody)) + req, err := http.NewRequest(http.MethodPost, "/dataflows/some-id/started", bytes.NewBuffer(requestBody)) assert.NoError(t, err) rr := httptest.NewRecorder() diff --git a/pkg/dsdk/messages.go b/pkg/dsdk/messages.go index 6115542..bbe72ef 100644 --- a/pkg/dsdk/messages.go +++ b/pkg/dsdk/messages.go @@ -52,7 +52,7 @@ func (d *DataFlowStartMessage) Validate() error { } type DataFlowStartByIdMessage struct { - SourceDataAddress *DataAddress `json:"sourceDataAddress,omitempty" validate:"required"` + SourceDataAddress *DataAddress `json:"dataAddress,omitempty" validate:"required"` } func (d *DataFlowStartByIdMessage) Validate() error { From 24ea9283ce7ee3f32016e16a81b871c63a743bcb Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 08:04:12 +0200 Subject: [PATCH 2/5] add tests for start-by-id --- internal/tests/api_integration_test.go | 2 +- pkg/dsdk/dsdk.go | 6 +- pkg/dsdk/dsdk_test.go | 98 ++++++++++++++++++++++++++ pkg/dsdk/messages.go | 2 +- pkg/dsdk/model_test.go | 2 +- 5 files changed, 106 insertions(+), 4 deletions(-) diff --git a/internal/tests/api_integration_test.go b/internal/tests/api_integration_test.go index 29ad17c..bd22a7c 100644 --- a/internal/tests/api_integration_test.go +++ b/internal/tests/api_integration_test.go @@ -433,7 +433,7 @@ func newStartMessage() dsdk.DataFlowStartMessage { func newStartByIdMessage() dsdk.DataFlowStartByIdMessage { return dsdk.DataFlowStartByIdMessage{ - SourceDataAddress: &dsdk.DataAddress{ + DataAddress: &dsdk.DataAddress{ Properties: map[string]any{ "foo": "bar", }, diff --git a/pkg/dsdk/dsdk.go b/pkg/dsdk/dsdk.go index 7b86ac2..ebe29de 100644 --- a/pkg/dsdk/dsdk.go +++ b/pkg/dsdk/dsdk.go @@ -173,7 +173,11 @@ func (dsdk *DataPlaneSDK) StartById(ctx context.Context, processID string, messa return ErrNotFound } - response, err = dsdk.startExistingFlow(ctx, existingFlow, message.SourceDataAddress) + if !existingFlow.Consumer { + return fmt.Errorf("%w: startById is only valid for consumer data flows", ErrInvalidInput) + } + + response, err = dsdk.startExistingFlow(ctx, existingFlow, message.DataAddress) return err }) diff --git a/pkg/dsdk/dsdk_test.go b/pkg/dsdk/dsdk_test.go index d5128dc..3b74ef3 100644 --- a/pkg/dsdk/dsdk_test.go +++ b/pkg/dsdk/dsdk_test.go @@ -212,6 +212,99 @@ func Test_DataPlaneSDK_Start_ConsumerPrepared(t *testing.T) { assert.NoError(t, err) } +func Test_DataPlaneSDK_StartById_Exists(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + onStart: func(context.Context, *DataFlow, *DataPlaneSDK, *ProcessorOptions) (*DataFlowResponseMessage, error) { + return &DataFlowResponseMessage{State: Started}, nil + }, + } + ctx := context.Background() + store.EXPECT().FindById(ctx, "process123").Return(&DataFlow{ + ID: "process123", + State: Prepared, + Consumer: true, + }, nil) + store.EXPECT().Save(ctx, mock.AnythingOfType("*dsdk.DataFlow")).Return(nil) + + r, err := dsdk.StartById(ctx, "process123", createStartByIdMessage()) + assert.NoError(t, err) + assert.Equal(t, r.State, Started) +} + +func Test_DataPlaneSDK_StartById_NotExists(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + } + ctx := context.Background() + store.EXPECT().FindById(ctx, "process123").Return(nil, ErrNotFound) + + _, err := dsdk.StartById(ctx, "process123", createStartByIdMessage()) + assert.ErrorIs(t, err, ErrNotFound) +} + +func Test_DataPlaneSDK_StartById_NotConsumer(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + } + ctx := context.Background() + store.EXPECT().FindById(ctx, "process123").Return(&DataFlow{ + ID: "process123", + State: Prepared, + Consumer: false, + }, nil) + + r, err := dsdk.StartById(ctx, "process123", createStartByIdMessage()) + assert.ErrorIs(t, err, ErrInvalidInput) + assert.Nil(t, r) +} + +func Test_DataPlaneSDK_StartById_WrongState(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + } + ctx := context.Background() + store.EXPECT().FindById(ctx, "process123").Return(&DataFlow{ + ID: "process123", + State: Uninitialized, + Consumer: true, + }, nil) + + r, err := dsdk.StartById(ctx, "process123", createStartByIdMessage()) + assert.ErrorIs(t, err, ErrInvalidTransition) + assert.Nil(t, r) +} + +func Test_DataPlaneSDK_StartById_AlreadyStarted(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + onStart: func(context.Context, *DataFlow, *DataPlaneSDK, *ProcessorOptions) (*DataFlowResponseMessage, error) { + return &DataFlowResponseMessage{State: Started}, nil + }, + } + ctx := context.Background() + store.EXPECT().FindById(ctx, "process123").Return(&DataFlow{ + ID: "process123", + State: Started, + Consumer: true, + }, nil) + store.EXPECT().Save(ctx, mock.AnythingOfType("*dsdk.DataFlow")).Return(nil) + + r, err := dsdk.StartById(ctx, "process123", createStartByIdMessage()) + assert.NoError(t, err) + assert.Equal(t, r.State, Started) +} + func Test_DataPlaneSDK_Status(t *testing.T) { store := NewMockDataplaneStore(t) dsdk := DataPlaneSDK{ @@ -593,6 +686,11 @@ func createBaseMessage() DataFlowBaseMessage { DestinationDataAddress: DataAddress{}, } } +func createStartByIdMessage() DataFlowStartByIdMessage { + return DataFlowStartByIdMessage{ + DataAddress: &DataAddress{}, + } +} type mockTrxContext struct { } diff --git a/pkg/dsdk/messages.go b/pkg/dsdk/messages.go index bbe72ef..f108861 100644 --- a/pkg/dsdk/messages.go +++ b/pkg/dsdk/messages.go @@ -52,7 +52,7 @@ func (d *DataFlowStartMessage) Validate() error { } type DataFlowStartByIdMessage struct { - SourceDataAddress *DataAddress `json:"dataAddress,omitempty" validate:"required"` + DataAddress *DataAddress `json:"dataAddress,omitempty" validate:"required"` } func (d *DataFlowStartByIdMessage) Validate() error { diff --git a/pkg/dsdk/model_test.go b/pkg/dsdk/model_test.go index 820525c..bf7be57 100644 --- a/pkg/dsdk/model_test.go +++ b/pkg/dsdk/model_test.go @@ -44,7 +44,7 @@ func Test_dataFlowStartSerialize(t *testing.T) { assert.Equal(t, original.AgreementID, decoded.AgreementID, "AgreementID should be equal") assert.Equal(t, original.CallbackAddress, decoded.CallbackAddress, "CallbackAddress should be equal") assert.Equal(t, original.TransferType, decoded.TransferType, "TransferType should be equal") - assert.Equal(t, original.SourceDataAddress, decoded.SourceDataAddress, "SourceDataAddress should be equal") + assert.Equal(t, original.SourceDataAddress, decoded.SourceDataAddress, "DataAddress should be equal") assert.Equal(t, original, original) } From 1c2da4c9daab0237c2fee1f2db9af663a2df6cfb Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 08:43:47 +0200 Subject: [PATCH 3/5] update msg types source/dest -> dataAddress --- .../consumer/dataplane.go | 6 +- .../consumer/dataplane.go | 2 +- .../provider/dataplane.go | 6 +- .../sync-pull-dataplane/consumer/dataplane.go | 4 +- internal/tests/api_integration_test.go | 64 ++++++++++--------- pkg/dsdk/api.go | 4 +- pkg/dsdk/dsdk.go | 14 ++-- pkg/dsdk/dsdk_test.go | 7 +- pkg/dsdk/messages.go | 27 ++++---- pkg/dsdk/messages_test.go | 4 +- pkg/dsdk/model_test.go | 5 +- 11 files changed, 72 insertions(+), 71 deletions(-) diff --git a/examples/streaming-pull-dataplane/consumer/dataplane.go b/examples/streaming-pull-dataplane/consumer/dataplane.go index b47cc39..4fb8d5c 100644 --- a/examples/streaming-pull-dataplane/consumer/dataplane.go +++ b/examples/streaming-pull-dataplane/consumer/dataplane.go @@ -81,12 +81,12 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context, _ *dsdk.DataPlaneSDK, options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) { - endpoint := options.SourceDataAddress.Properties[dsdk.EndpointKey].(string) - token, found := parseToken(natsservices.TokenKey, options.SourceDataAddress) + endpoint := options.DataAddress.Properties[dsdk.EndpointKey].(string) + token, found := parseToken(natsservices.TokenKey, options.DataAddress) if !found { return nil, errors.New("token not found in endpoint properties") } - channel, found := parseToken(natsservices.ChannelKey, options.SourceDataAddress) + channel, found := parseToken(natsservices.ChannelKey, options.DataAddress) if !found { return nil, errors.New("channel not found in endpoint properties") } diff --git a/examples/streaming-push-dataplane/consumer/dataplane.go b/examples/streaming-push-dataplane/consumer/dataplane.go index dadb08c..d9e8a8b 100644 --- a/examples/streaming-push-dataplane/consumer/dataplane.go +++ b/examples/streaming-push-dataplane/consumer/dataplane.go @@ -124,7 +124,7 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context, flow *dsdk.DataFlow, _ *dsdk.DataPlaneSDK, options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) { - return &dsdk.DataFlowResponseMessage{State: dsdk.Started, DataAddress: options.SourceDataAddress}, nil + return &dsdk.DataFlowResponseMessage{State: dsdk.Started, DataAddress: options.DataAddress}, nil } func (d *ConsumerDataPlane) suspendProcessor(_ context.Context, flow *dsdk.DataFlow) error { diff --git a/examples/streaming-push-dataplane/provider/dataplane.go b/examples/streaming-push-dataplane/provider/dataplane.go index d46732b..8b86d83 100644 --- a/examples/streaming-push-dataplane/provider/dataplane.go +++ b/examples/streaming-push-dataplane/provider/dataplane.go @@ -81,12 +81,12 @@ func (d *ProviderDataPlane) startProcessor(ctx context.Context, _ *dsdk.DataPlaneSDK, options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) { - endpoint := options.SourceDataAddress.Properties[dsdk.EndpointKey].(string) - token, found := parseToken(natsservices.TokenKey, options.SourceDataAddress) + endpoint := options.DataAddress.Properties[dsdk.EndpointKey].(string) + token, found := parseToken(natsservices.TokenKey, options.DataAddress) if !found { return nil, errors.New("token not found in endpoint properties") } - channel, found := parseToken(natsservices.ChannelKey, options.SourceDataAddress) + channel, found := parseToken(natsservices.ChannelKey, options.DataAddress) if !found { return nil, errors.New("channel not found in endpoint properties") } diff --git a/examples/sync-pull-dataplane/consumer/dataplane.go b/examples/sync-pull-dataplane/consumer/dataplane.go index e1264a4..7c7f01e 100644 --- a/examples/sync-pull-dataplane/consumer/dataplane.go +++ b/examples/sync-pull-dataplane/consumer/dataplane.go @@ -102,8 +102,8 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context, _ *dsdk.DataPlaneSDK, options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) { log.Printf("[Consumer Data Plane] Transfer access token available for participant %s dataset %s\n", flow.ParticipantID, flow.DatasetID) - endpoint := options.SourceDataAddress.Properties[dsdk.EndpointKey].(string) - token := options.SourceDataAddress.Properties["token"].(string) + endpoint := options.DataAddress.Properties[dsdk.EndpointKey].(string) + token := options.DataAddress.Properties["token"].(string) d.tokenStore.Create(flow.DatasetID, tokenEntry{datasetID: flow.DatasetID, token: token, endpoint: endpoint}) return &dsdk.DataFlowResponseMessage{State: dsdk.Started}, nil } diff --git a/internal/tests/api_integration_test.go b/internal/tests/api_integration_test.go index bd22a7c..4081100 100644 --- a/internal/tests/api_integration_test.go +++ b/internal/tests/api_integration_test.go @@ -131,7 +131,7 @@ func Test_StartByID_WhenStartedOrStarting(t *testing.T) { for _, state := range states { id := uuid.New().String() store := postgres.NewStore(database) - flow, err := newFlowBuilder().ID(id).State(state).Build() + flow, err := newFlowBuilder().ID(id).State(state).Consumer(true).Build() assert.NoError(t, err) assert.NoError(t, store.Create(ctx, flow)) @@ -196,16 +196,21 @@ func Test_StartByID_WhenPrepared(t *testing.T) { } func Test_StartByID_MissingSourceAddress(t *testing.T) { - requestBody, err := serialize(dsdk.DataFlowStartByIdMessage{}) + id := uuid.New().String() + store := postgres.NewStore(database) + flow, err := newFlowBuilder().ID(id).State(dsdk.Prepared).Consumer(true).Build() + assert.NoError(t, err) + assert.NoError(t, store.Create(ctx, flow)) + requestBody, err := serialize(dsdk.DataFlowStartedNotificationMessage{}) assert.NoError(t, err) - req, err := http.NewRequest(http.MethodPost, "/dataflows/some-id/started", bytes.NewBuffer(requestBody)) + req, err := http.NewRequest(http.MethodPost, "/dataflows/"+flow.ID+"/started", bytes.NewBuffer(requestBody)) assert.NoError(t, err) rr := httptest.NewRecorder() handler.ServeHTTP(rr, req) - assert.Equal(t, http.StatusBadRequest, rr.Code) + assert.Equal(t, http.StatusOK, rr.Code) } func Test_Prepare(t *testing.T) { @@ -412,27 +417,26 @@ func serialize(obj any) ([]byte, error) { func newStartMessage() dsdk.DataFlowStartMessage { return dsdk.DataFlowStartMessage{ DataFlowBaseMessage: dsdk.DataFlowBaseMessage{ - MessageID: uuid.New().String(), - ParticipantID: uuid.New().String(), - CounterPartyID: uuid.New().String(), - DataspaceContext: uuid.New().String(), - ProcessID: uuid.New().String(), - AgreementID: uuid.New().String(), - DatasetID: uuid.New().String(), - CallbackAddress: newCallback(), - TransferType: newTransferType(), - DestinationDataAddress: dsdk.DataAddress{}, - }, - SourceDataAddress: &dsdk.DataAddress{ - Properties: map[string]any{ - "foo": "bar", + MessageID: uuid.New().String(), + ParticipantID: uuid.New().String(), + CounterPartyID: uuid.New().String(), + DataspaceContext: uuid.New().String(), + ProcessID: uuid.New().String(), + AgreementID: uuid.New().String(), + DatasetID: uuid.New().String(), + CallbackAddress: newCallback(), + TransferType: newTransferType(), + DataAddress: dsdk.DataAddress{ + Properties: map[string]any{ + "foo": "bar", + }, }, }, } } -func newStartByIdMessage() dsdk.DataFlowStartByIdMessage { - return dsdk.DataFlowStartByIdMessage{ +func newStartByIdMessage() dsdk.DataFlowStartedNotificationMessage { + return dsdk.DataFlowStartedNotificationMessage{ DataAddress: &dsdk.DataAddress{ Properties: map[string]any{ "foo": "bar", @@ -451,16 +455,16 @@ func newTransferType() dsdk.TransferType { func newPrepareMessage() dsdk.DataFlowPrepareMessage { return dsdk.DataFlowPrepareMessage{ DataFlowBaseMessage: dsdk.DataFlowBaseMessage{ - MessageID: uuid.New().String(), - ParticipantID: uuid.New().String(), - CounterPartyID: uuid.New().String(), - DataspaceContext: uuid.New().String(), - ProcessID: uuid.New().String(), - AgreementID: uuid.New().String(), - DatasetID: uuid.New().String(), - CallbackAddress: newCallback(), - TransferType: newTransferType(), - DestinationDataAddress: dsdk.DataAddress{}, + MessageID: uuid.New().String(), + ParticipantID: uuid.New().String(), + CounterPartyID: uuid.New().String(), + DataspaceContext: uuid.New().String(), + ProcessID: uuid.New().String(), + AgreementID: uuid.New().String(), + DatasetID: uuid.New().String(), + CallbackAddress: newCallback(), + TransferType: newTransferType(), + DataAddress: dsdk.DataAddress{}, }, } } diff --git a/pkg/dsdk/api.go b/pkg/dsdk/api.go index 7eaa612..b0cd217 100644 --- a/pkg/dsdk/api.go +++ b/pkg/dsdk/api.go @@ -104,7 +104,7 @@ func (d *DataPlaneApi) StartById(w http.ResponseWriter, r *http.Request, id stri http.Error(w, "Invalid request method", http.StatusBadRequest) return } - var startMessage DataFlowStartByIdMessage + var startMessage DataFlowStartedNotificationMessage if err := json.NewDecoder(r.Body).Decode(&startMessage); err != nil { d.decodingError(w, err) @@ -228,7 +228,7 @@ func (d *DataPlaneApi) decodingError(w http.ResponseWriter, err error) { func (d *DataPlaneApi) handleError(err error, w http.ResponseWriter) { switch { - case errors.Is(err, ErrValidation), errors.Is(err, ErrInvalidTransition): + case errors.Is(err, ErrValidation), errors.Is(err, ErrInvalidTransition), errors.Is(err, ErrInvalidInput): d.badRequest(err.Error(), w) case errors.Is(err, ErrNotFound): d.writeResponse(w, http.StatusNotFound, &DataFlowResponseMessage{Error: err.Error()}) diff --git a/pkg/dsdk/dsdk.go b/pkg/dsdk/dsdk.go index ebe29de..5cd62cf 100644 --- a/pkg/dsdk/dsdk.go +++ b/pkg/dsdk/dsdk.go @@ -12,8 +12,8 @@ import ( type DataFlowProcessor func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) type ProcessorOptions struct { - Duplicate bool - SourceDataAddress *DataAddress + Duplicate bool + DataAddress *DataAddress } type DataFlowHandler func(context.Context, *DataFlow) error @@ -136,7 +136,7 @@ func (dsdk *DataPlaneSDK) Start(ctx context.Context, message DataFlowStartMessag if err != nil { return fmt.Errorf("creating data flow: %w", err) } - response, err = dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{SourceDataAddress: message.SourceDataAddress}) + response, err = dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{DataAddress: &message.DataAddress}) if err != nil { return fmt.Errorf("processing data flow: %w", err) } @@ -152,7 +152,7 @@ func (dsdk *DataPlaneSDK) Start(ctx context.Context, message DataFlowStartMessag return nil } - response, err = dsdk.startExistingFlow(ctx, flow, message.SourceDataAddress) + response, err = dsdk.startExistingFlow(ctx, flow, &message.DataAddress) return err }) @@ -160,7 +160,7 @@ func (dsdk *DataPlaneSDK) Start(ctx context.Context, message DataFlowStartMessag } -func (dsdk *DataPlaneSDK) StartById(ctx context.Context, processID string, message DataFlowStartByIdMessage) (*DataFlowResponseMessage, error) { +func (dsdk *DataPlaneSDK) StartById(ctx context.Context, processID string, message DataFlowStartedNotificationMessage) (*DataFlowResponseMessage, error) { var response *DataFlowResponseMessage err := dsdk.execute(ctx, func(ctx context.Context) error { @@ -266,7 +266,7 @@ func (dsdk *DataPlaneSDK) startExistingFlow(ctx context.Context, flow *DataFlow, switch { case flow != nil && (flow.State == Starting || flow.State == Started): // duplicate message, pass to handler to generate a data address if needed - response, err := dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{Duplicate: true, SourceDataAddress: sourceAddress}) + response, err := dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{Duplicate: true, DataAddress: sourceAddress}) if err != nil { return nil, fmt.Errorf("processing data flow: %w", err) } @@ -282,7 +282,7 @@ func (dsdk *DataPlaneSDK) startExistingFlow(ctx context.Context, flow *DataFlow, return response, err case flow != nil && flow.Consumer && flow.State == Prepared: // consumer side, process - response, err := dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{SourceDataAddress: sourceAddress}) + response, err := dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{DataAddress: sourceAddress}) if err != nil { return nil, fmt.Errorf("processing data flow: %w", err) } diff --git a/pkg/dsdk/dsdk_test.go b/pkg/dsdk/dsdk_test.go index 3b74ef3..f4a594a 100644 --- a/pkg/dsdk/dsdk_test.go +++ b/pkg/dsdk/dsdk_test.go @@ -666,7 +666,6 @@ func createPrepareMessage() DataFlowPrepareMessage { func createStartMessage() DataFlowStartMessage { return DataFlowStartMessage{ DataFlowBaseMessage: createBaseMessage(), - SourceDataAddress: &DataAddress{}, } } func createBaseMessage() DataFlowBaseMessage { @@ -683,11 +682,11 @@ func createBaseMessage() DataFlowBaseMessage { DestinationType: "test-type", FlowType: Pull, }, - DestinationDataAddress: DataAddress{}, + DataAddress: DataAddress{}, } } -func createStartByIdMessage() DataFlowStartByIdMessage { - return DataFlowStartByIdMessage{ +func createStartByIdMessage() DataFlowStartedNotificationMessage { + return DataFlowStartedNotificationMessage{ DataAddress: &DataAddress{}, } } diff --git a/pkg/dsdk/messages.go b/pkg/dsdk/messages.go index f108861..8c761c7 100644 --- a/pkg/dsdk/messages.go +++ b/pkg/dsdk/messages.go @@ -7,16 +7,16 @@ import ( var v = validator.New() type DataFlowBaseMessage struct { - MessageID string `json:"messageID" validate:"required"` - ParticipantID string `json:"participantID" validate:"required"` - CounterPartyID string `json:"counterPartyID" validate:"required"` - DataspaceContext string `json:"dataspaceContext" validate:"required"` - ProcessID string `json:"processID" validate:"required"` - AgreementID string `json:"agreementID" validate:"required"` - DatasetID string `json:"datasetID"` - CallbackAddress CallbackURL `json:"callbackAddress" validate:"required,callback-url"` - TransferType TransferType `json:"transferType" validate:"required"` - DestinationDataAddress DataAddress `json:"destinationDataAddress" validate:"required"` + MessageID string `json:"messageID" validate:"required"` + ParticipantID string `json:"participantID" validate:"required"` + CounterPartyID string `json:"counterPartyID" validate:"required"` + DataspaceContext string `json:"dataspaceContext" validate:"required"` + ProcessID string `json:"processID" validate:"required"` + AgreementID string `json:"agreementID" validate:"required"` + DatasetID string `json:"datasetID"` + CallbackAddress CallbackURL `json:"callbackAddress" validate:"required,callback-url"` + TransferType TransferType `json:"transferType" validate:"required"` + DataAddress DataAddress `json:"dataAddress" validate:"required"` } func (d *DataFlowBaseMessage) Validate() error { @@ -36,7 +36,6 @@ func (d *DataFlowBaseMessage) Validate() error { type DataFlowStartMessage struct { DataFlowBaseMessage - SourceDataAddress *DataAddress `json:"sourceDataAddress,omitempty"` } func (d *DataFlowStartMessage) Validate() error { @@ -51,11 +50,11 @@ func (d *DataFlowStartMessage) Validate() error { return nil } -type DataFlowStartByIdMessage struct { - DataAddress *DataAddress `json:"dataAddress,omitempty" validate:"required"` +type DataFlowStartedNotificationMessage struct { + DataAddress *DataAddress `json:"dataAddress,omitempty"` } -func (d *DataFlowStartByIdMessage) Validate() error { +func (d *DataFlowStartedNotificationMessage) Validate() error { err := v.Struct(d) if err != nil { return WrapValidationError(err) diff --git a/pkg/dsdk/messages_test.go b/pkg/dsdk/messages_test.go index 51f0a5a..27bfad6 100644 --- a/pkg/dsdk/messages_test.go +++ b/pkg/dsdk/messages_test.go @@ -39,7 +39,7 @@ func Test_Message_InvalidTransferType(t *testing.T) { func Test_StartMessage_Success(t *testing.T) { msg := newBaseMessage() - startMsg := DataFlowStartMessage{DataFlowBaseMessage: msg, SourceDataAddress: &DataAddress{}} + startMsg := DataFlowStartMessage{DataFlowBaseMessage: msg} err := startMsg.Validate() assert.NoError(t, err) } @@ -95,6 +95,6 @@ func newBaseMessage() DataFlowBaseMessage { DestinationType: "test-type", FlowType: "pull", }, - DestinationDataAddress: DataAddress{}, + DataAddress: DataAddress{}, } } diff --git a/pkg/dsdk/model_test.go b/pkg/dsdk/model_test.go index bf7be57..cb6a270 100644 --- a/pkg/dsdk/model_test.go +++ b/pkg/dsdk/model_test.go @@ -28,9 +28,8 @@ func Test_dataFlowStartSerialize(t *testing.T) { DestinationType: "PULL", FlowType: FlowType("PULL"), }, - DestinationDataAddress: *build, + DataAddress: *build, }, - SourceDataAddress: build, } jsonData, err := json.Marshal(original) @@ -44,7 +43,7 @@ func Test_dataFlowStartSerialize(t *testing.T) { assert.Equal(t, original.AgreementID, decoded.AgreementID, "AgreementID should be equal") assert.Equal(t, original.CallbackAddress, decoded.CallbackAddress, "CallbackAddress should be equal") assert.Equal(t, original.TransferType, decoded.TransferType, "TransferType should be equal") - assert.Equal(t, original.SourceDataAddress, decoded.SourceDataAddress, "DataAddress should be equal") + assert.Equal(t, original.DataAddress, decoded.DataAddress, "DataAddress should be equal") assert.Equal(t, original, original) } From 46f0a0c5eb0fdcba07cc419f6cf48528f9fc3c6a Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 09:25:54 +0200 Subject: [PATCH 4/5] add /:id/completed endpoint --- internal/tests/api_integration_test.go | 50 ++++++++++++ pkg/dsdk/api.go | 13 +++ pkg/dsdk/dsdk.go | 39 ++++++++- pkg/dsdk/dsdk_test.go | 107 +++++++++++++++++++++++++ 4 files changed, 208 insertions(+), 1 deletion(-) diff --git a/internal/tests/api_integration_test.go b/internal/tests/api_integration_test.go index 4081100..aaa2871 100644 --- a/internal/tests/api_integration_test.go +++ b/internal/tests/api_integration_test.go @@ -52,6 +52,11 @@ func newServerWithSdk(t *testing.T, sdk *dsdk.DataPlaneSDK) http.Handler { id := chi.URLParam(request, "id") sdkApi.Status(id, writer, request) }) + + r.Post("/dataflows/{id}/completed", func(writer http.ResponseWriter, request *http.Request) { + id := chi.URLParam(request, "id") + sdkApi.Complete(id, writer, request) + }) return r } @@ -370,6 +375,51 @@ func Test_Terminate_WhenNotFound(t *testing.T) { assert.Equal(t, http.StatusNotFound, rr.Code) } +func Test_Complete(t *testing.T) { + id := uuid.New().String() + flow, err := newFlowBuilder().ID(id).State(dsdk.Started).Build() + assert.NoError(t, err) + store := postgres.NewStore(database) + err = store.Create(ctx, flow) + assert.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, "/dataflows/"+flow.ID+"/completed", strings.NewReader("")) + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + byId, err := store.FindById(ctx, id) + assert.NoError(t, err) + assert.Equal(t, dsdk.Completed, byId.State) +} + +func Test_Complete_NotFound(t *testing.T) { + req, err := http.NewRequest(http.MethodPost, "/dataflows/not-exist/completed", strings.NewReader("")) + assert.NoError(t, err) + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusNotFound, rr.Code) +} + +func Test_Complete_WrongState(t *testing.T) { + id := uuid.New().String() + flow, err := newFlowBuilder().ID(id).State(dsdk.Terminated).Build() + assert.NoError(t, err) + store := postgres.NewStore(database) + err = store.Create(ctx, flow) + assert.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, "/dataflows/"+flow.ID+"/completed", strings.NewReader("")) + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusBadRequest, rr.Code) + byId, err := store.FindById(ctx, id) + assert.NoError(t, err) + assert.NotEqual(t, dsdk.Completed, byId.State) +} + func Test_GetStatus(t *testing.T) { id := uuid.New().String() flow, err := newFlowBuilder().ID(id).State(dsdk.Started).Build() diff --git a/pkg/dsdk/api.go b/pkg/dsdk/api.go index b0cd217..9e241e5 100644 --- a/pkg/dsdk/api.go +++ b/pkg/dsdk/api.go @@ -218,6 +218,19 @@ func (d *DataPlaneApi) Status(processID string, w http.ResponseWriter, r *http.R d.writeResponse(w, http.StatusOK, response) } +func (d *DataPlaneApi) Complete(processID string, w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusBadRequest) + return + } + err := d.sdk.Complete(r.Context(), processID) + if err != nil { + d.handleError(err, w) + return + } + d.writeResponse(w, http.StatusOK, nil) +} + func (d *DataPlaneApi) decodingError(w http.ResponseWriter, err error) { id := uuid.NewString() d.sdk.Monitor.Printf("Error decoding flow [%s]: %v\n", id, err) diff --git a/pkg/dsdk/dsdk.go b/pkg/dsdk/dsdk.go index 5cd62cf..db62c0c 100644 --- a/pkg/dsdk/dsdk.go +++ b/pkg/dsdk/dsdk.go @@ -32,6 +32,7 @@ type DataPlaneSDK struct { onStart DataFlowProcessor onTerminate DataFlowHandler onSuspend DataFlowHandler + onComplete DataFlowHandler } // Prepare is called on the consumer to prepare for receiving data. @@ -262,6 +263,38 @@ func (dsdk *DataPlaneSDK) Status(ctx context.Context, id string) (*DataFlow, err return flow, err } +func (dsdk *DataPlaneSDK) Complete(ctx context.Context, dataflowID string) error { + if dataflowID == "" { + return errors.New("processID cannot be empty") + } + + return dsdk.execute(ctx, func(ctx context.Context) error { + flow, err := dsdk.Store.FindById(ctx, dataflowID) + if err != nil { + return fmt.Errorf("completing data flow %s: %w", dataflowID, err) + } + + if flow.State == Completed { // de-duplication + return nil + } + + transitionError := flow.TransitionToCompleted() + if transitionError != nil { + return transitionError + } + // only invoked if the transition was successful + e := dsdk.onComplete(ctx, flow) + if e != nil { + return e + } + storeErr := dsdk.Store.Save(ctx, flow) + if err != nil { + return fmt.Errorf("completing data flow %s: %w", flow.ID, storeErr) + } + return nil + }) +} + func (dsdk *DataPlaneSDK) startExistingFlow(ctx context.Context, flow *DataFlow, sourceAddress *DataAddress) (*DataFlowResponseMessage, error) { switch { case flow != nil && (flow.State == Starting || flow.State == Started): @@ -422,7 +455,11 @@ func NewDataPlaneSDK(options ...DataPlaneSDKOption) (*DataPlaneSDK, error) { return nil } } - + if sdk.onComplete == nil { + sdk.onComplete = func(context context.Context, flow *DataFlow) error { + return nil + } + } return sdk, nil } diff --git a/pkg/dsdk/dsdk_test.go b/pkg/dsdk/dsdk_test.go index f4a594a..ab88d78 100644 --- a/pkg/dsdk/dsdk_test.go +++ b/pkg/dsdk/dsdk_test.go @@ -660,6 +660,113 @@ func Test_DataPlaneSDK_Suspend_SdkCallbackError(t *testing.T) { assert.ErrorContains(t, err, "some error") } +func Test_DataPlaneSDK_Completed(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + onComplete: func(ctx context.Context, flow *DataFlow) error { + return nil + }, + } + + ctx := context.Background() + + store.EXPECT().FindById(ctx, "flow123").Return(&DataFlow{ + ID: "flow123", + State: Started, // already suspended + }, nil) + store.EXPECT().Save(ctx, mock.MatchedBy(func(df *DataFlow) bool { + return df.State == Completed + })).Return(nil) + + err := dsdk.Complete(ctx, "flow123") + + assert.NoError(t, err) +} + +func Test_DataPlaneSDK_Completed_AlreadyCompleted(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + } + + ctx := context.Background() + + store.EXPECT().FindById(ctx, "flow123").Return(&DataFlow{ + ID: "flow123", + State: Completed, // already suspended + }, nil) + + err := dsdk.Complete(ctx, "flow123") + + assert.NoError(t, err) +} + +func Test_DataPlaneSDK_Completed_SdkError(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + onComplete: func(ctx context.Context, flow *DataFlow) error { + return fmt.Errorf("some error") + }, + } + + ctx := context.Background() + + store.EXPECT().FindById(ctx, "flow123").Return(&DataFlow{ + ID: "flow123", + State: Started, // already suspended + }, nil) + + err := dsdk.Complete(ctx, "flow123") + + assert.ErrorContains(t, err, "some error") +} + +func Test_DataPlaneSDK_Completed_WrongState(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + onComplete: func(ctx context.Context, flow *DataFlow) error { + return nil + }, + } + + ctx := context.Background() + + store.EXPECT().FindById(ctx, "flow123").Return(&DataFlow{ + ID: "flow123", + State: Terminated, + }, nil) + + err := dsdk.Complete(ctx, "flow123") + + assert.ErrorIs(t, err, ErrInvalidTransition) +} + +func Test_DataPlaneSDK_NotFound(t *testing.T) { + store := NewMockDataplaneStore(t) + dsdk := DataPlaneSDK{ + Store: store, + TrxContext: &mockTrxContext{}, + onComplete: func(ctx context.Context, flow *DataFlow) error { + return nil + }, + } + + ctx := context.Background() + + store.EXPECT().FindById(ctx, "flow123").Return(nil, ErrNotFound) + + err := dsdk.Complete(ctx, "flow123") + + assert.ErrorIs(t, err, ErrNotFound) +} + func createPrepareMessage() DataFlowPrepareMessage { return DataFlowPrepareMessage{DataFlowBaseMessage: createBaseMessage()} } From 58a4bce749b3c039b15ce36b310f4706ef67146d Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 15 Oct 2025 09:33:03 +0200 Subject: [PATCH 5/5] base message data address is optional --- examples/controlplane/controlplane.go | 4 ++-- internal/tests/api_integration_test.go | 4 ++-- pkg/dsdk/dsdk.go | 4 ++-- pkg/dsdk/dsdk_test.go | 2 +- pkg/dsdk/messages.go | 2 +- pkg/dsdk/messages_test.go | 2 +- pkg/dsdk/model_test.go | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/controlplane/controlplane.go b/examples/controlplane/controlplane.go index a284584..960bced 100644 --- a/examples/controlplane/controlplane.go +++ b/examples/controlplane/controlplane.go @@ -63,8 +63,8 @@ func (c *ControlPlaneSimulator) ProviderStart(ctx context.Context, ParticipantID: "did:web:provider.com", CallbackAddress: dsdk.CallbackURL(*callbackURL), TransferType: dsdk.TransferType{DestinationType: "custom", FlowType: dsdk.Pull}, + DataAddress: da, }, - SourceDataAddress: da, } serialized, err := json.Marshal(startMessage) @@ -115,8 +115,8 @@ func (c *ControlPlaneSimulator) ConsumerStart(ctx context.Context, processID str CounterPartyID: "did:web:provider.com", CallbackAddress: dsdk.CallbackURL(*callbackURL), TransferType: dsdk.TransferType{DestinationType: "custom", FlowType: dsdk.Pull}, + DataAddress: source, }, - SourceDataAddress: source, } serialized, err := json.Marshal(startMessage) diff --git a/internal/tests/api_integration_test.go b/internal/tests/api_integration_test.go index aaa2871..02fc558 100644 --- a/internal/tests/api_integration_test.go +++ b/internal/tests/api_integration_test.go @@ -476,7 +476,7 @@ func newStartMessage() dsdk.DataFlowStartMessage { DatasetID: uuid.New().String(), CallbackAddress: newCallback(), TransferType: newTransferType(), - DataAddress: dsdk.DataAddress{ + DataAddress: &dsdk.DataAddress{ Properties: map[string]any{ "foo": "bar", }, @@ -514,7 +514,7 @@ func newPrepareMessage() dsdk.DataFlowPrepareMessage { DatasetID: uuid.New().String(), CallbackAddress: newCallback(), TransferType: newTransferType(), - DataAddress: dsdk.DataAddress{}, + DataAddress: &dsdk.DataAddress{}, }, } } diff --git a/pkg/dsdk/dsdk.go b/pkg/dsdk/dsdk.go index db62c0c..62c0d27 100644 --- a/pkg/dsdk/dsdk.go +++ b/pkg/dsdk/dsdk.go @@ -137,7 +137,7 @@ func (dsdk *DataPlaneSDK) Start(ctx context.Context, message DataFlowStartMessag if err != nil { return fmt.Errorf("creating data flow: %w", err) } - response, err = dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{DataAddress: &message.DataAddress}) + response, err = dsdk.onStart(ctx, flow, dsdk, &ProcessorOptions{DataAddress: message.DataAddress}) if err != nil { return fmt.Errorf("processing data flow: %w", err) } @@ -153,7 +153,7 @@ func (dsdk *DataPlaneSDK) Start(ctx context.Context, message DataFlowStartMessag return nil } - response, err = dsdk.startExistingFlow(ctx, flow, &message.DataAddress) + response, err = dsdk.startExistingFlow(ctx, flow, message.DataAddress) return err }) diff --git a/pkg/dsdk/dsdk_test.go b/pkg/dsdk/dsdk_test.go index ab88d78..661a129 100644 --- a/pkg/dsdk/dsdk_test.go +++ b/pkg/dsdk/dsdk_test.go @@ -789,7 +789,7 @@ func createBaseMessage() DataFlowBaseMessage { DestinationType: "test-type", FlowType: Pull, }, - DataAddress: DataAddress{}, + DataAddress: &DataAddress{}, } } func createStartByIdMessage() DataFlowStartedNotificationMessage { diff --git a/pkg/dsdk/messages.go b/pkg/dsdk/messages.go index 8c761c7..be648c3 100644 --- a/pkg/dsdk/messages.go +++ b/pkg/dsdk/messages.go @@ -16,7 +16,7 @@ type DataFlowBaseMessage struct { DatasetID string `json:"datasetID"` CallbackAddress CallbackURL `json:"callbackAddress" validate:"required,callback-url"` TransferType TransferType `json:"transferType" validate:"required"` - DataAddress DataAddress `json:"dataAddress" validate:"required"` + DataAddress *DataAddress `json:"dataAddress"` } func (d *DataFlowBaseMessage) Validate() error { diff --git a/pkg/dsdk/messages_test.go b/pkg/dsdk/messages_test.go index 27bfad6..a353695 100644 --- a/pkg/dsdk/messages_test.go +++ b/pkg/dsdk/messages_test.go @@ -95,6 +95,6 @@ func newBaseMessage() DataFlowBaseMessage { DestinationType: "test-type", FlowType: "pull", }, - DataAddress: DataAddress{}, + DataAddress: &DataAddress{}, } } diff --git a/pkg/dsdk/model_test.go b/pkg/dsdk/model_test.go index cb6a270..8eefbe1 100644 --- a/pkg/dsdk/model_test.go +++ b/pkg/dsdk/model_test.go @@ -28,7 +28,7 @@ func Test_dataFlowStartSerialize(t *testing.T) { DestinationType: "PULL", FlowType: FlowType("PULL"), }, - DataAddress: *build, + DataAddress: build, }, }