Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type TokenResponse struct {
func NewSignalingServer(sdkApi *dsdk.DataPlaneApi, port int) *http.Server {
r := chi.NewRouter()
r.Post("/dataflows/start", sdkApi.Start)
r.Post("/dataflows/{id}/start", func(writer http.ResponseWriter, request *http.Request) {
r.Post("/dataflows/{id}/started", func(writer http.ResponseWriter, request *http.Request) {
id := chi.URLParam(request, "id")
sdkApi.StartById(writer, request, id)
})
Expand Down
4 changes: 2 additions & 2 deletions examples/controlplane/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (c *ControlPlaneSimulator) ProviderStart(ctx context.Context,
ParticipantID: "did:web:provider.com",
CallbackAddress: dsdk.CallbackURL(*callbackURL),
TransferType: dsdk.TransferType{DestinationType: "custom", FlowType: dsdk.Pull},
DataAddress: da,
},
SourceDataAddress: da,
}

serialized, err := json.Marshal(startMessage)
Expand Down Expand Up @@ -115,8 +115,8 @@ func (c *ControlPlaneSimulator) ConsumerStart(ctx context.Context, processID str
CounterPartyID: "did:web:provider.com",
CallbackAddress: dsdk.CallbackURL(*callbackURL),
TransferType: dsdk.TransferType{DestinationType: "custom", FlowType: dsdk.Pull},
DataAddress: source,
},
SourceDataAddress: source,
}

serialized, err := json.Marshal(startMessage)
Expand Down
6 changes: 3 additions & 3 deletions examples/streaming-pull-dataplane/consumer/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context,
_ *dsdk.DataPlaneSDK,
options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) {

endpoint := options.SourceDataAddress.Properties[dsdk.EndpointKey].(string)
token, found := parseToken(natsservices.TokenKey, options.SourceDataAddress)
endpoint := options.DataAddress.Properties[dsdk.EndpointKey].(string)
token, found := parseToken(natsservices.TokenKey, options.DataAddress)
if !found {
return nil, errors.New("token not found in endpoint properties")
}
channel, found := parseToken(natsservices.ChannelKey, options.SourceDataAddress)
channel, found := parseToken(natsservices.ChannelKey, options.DataAddress)
if !found {
return nil, errors.New("channel not found in endpoint properties")
}
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming-push-dataplane/consumer/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context,
flow *dsdk.DataFlow,
_ *dsdk.DataPlaneSDK,
options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) {
return &dsdk.DataFlowResponseMessage{State: dsdk.Started, DataAddress: options.SourceDataAddress}, nil
return &dsdk.DataFlowResponseMessage{State: dsdk.Started, DataAddress: options.DataAddress}, nil
}

func (d *ConsumerDataPlane) suspendProcessor(_ context.Context, flow *dsdk.DataFlow) error {
Expand Down
6 changes: 3 additions & 3 deletions examples/streaming-push-dataplane/provider/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (d *ProviderDataPlane) startProcessor(ctx context.Context,
_ *dsdk.DataPlaneSDK,
options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) {

endpoint := options.SourceDataAddress.Properties[dsdk.EndpointKey].(string)
token, found := parseToken(natsservices.TokenKey, options.SourceDataAddress)
endpoint := options.DataAddress.Properties[dsdk.EndpointKey].(string)
token, found := parseToken(natsservices.TokenKey, options.DataAddress)
if !found {
return nil, errors.New("token not found in endpoint properties")
}
channel, found := parseToken(natsservices.ChannelKey, options.SourceDataAddress)
channel, found := parseToken(natsservices.ChannelKey, options.DataAddress)
if !found {
return nil, errors.New("channel not found in endpoint properties")
}
Expand Down
4 changes: 2 additions & 2 deletions examples/sync-pull-dataplane/consumer/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context,
_ *dsdk.DataPlaneSDK,
options *dsdk.ProcessorOptions) (*dsdk.DataFlowResponseMessage, error) {
log.Printf("[Consumer Data Plane] Transfer access token available for participant %s dataset %s\n", flow.ParticipantID, flow.DatasetID)
endpoint := options.SourceDataAddress.Properties[dsdk.EndpointKey].(string)
token := options.SourceDataAddress.Properties["token"].(string)
endpoint := options.DataAddress.Properties[dsdk.EndpointKey].(string)
token := options.DataAddress.Properties["token"].(string)
d.tokenStore.Create(flow.DatasetID, tokenEntry{datasetID: flow.DatasetID, token: token, endpoint: endpoint})
return &dsdk.DataFlowResponseMessage{State: dsdk.Started}, nil
}
Expand Down
124 changes: 89 additions & 35 deletions internal/tests/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newServerWithSdk(t *testing.T, sdk *dsdk.DataPlaneSDK) http.Handler {
r := chi.NewRouter()

r.Post("/dataflows/start", sdkApi.Start)
r.Post("/dataflows/{id}/start", func(writer http.ResponseWriter, request *http.Request) {
r.Post("/dataflows/{id}/started", func(writer http.ResponseWriter, request *http.Request) {
id := chi.URLParam(request, "id")
sdkApi.StartById(writer, request, id)
})
Expand All @@ -52,6 +52,11 @@ func newServerWithSdk(t *testing.T, sdk *dsdk.DataPlaneSDK) http.Handler {
id := chi.URLParam(request, "id")
sdkApi.Status(id, writer, request)
})

r.Post("/dataflows/{id}/completed", func(writer http.ResponseWriter, request *http.Request) {
id := chi.URLParam(request, "id")
sdkApi.Complete(id, writer, request)
})
return r
}

Expand Down Expand Up @@ -111,7 +116,7 @@ func Test_StartByID_WhenNotFound(t *testing.T) {
requestBody, err := serialize(newStartByIdMessage())
assert.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/start", bytes.NewBuffer(requestBody))
req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/started", bytes.NewBuffer(requestBody))
assert.NoError(t, err)

rr := httptest.NewRecorder()
Expand All @@ -131,14 +136,14 @@ func Test_StartByID_WhenStartedOrStarting(t *testing.T) {
for _, state := range states {
id := uuid.New().String()
store := postgres.NewStore(database)
flow, err := newFlowBuilder().ID(id).State(state).Build()
flow, err := newFlowBuilder().ID(id).State(state).Consumer(true).Build()
assert.NoError(t, err)
assert.NoError(t, store.Create(ctx, flow))

requestBody, err := serialize(newStartByIdMessage())
assert.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/start", bytes.NewBuffer(requestBody))
req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/started", bytes.NewBuffer(requestBody))
assert.NoError(t, err)

rr := httptest.NewRecorder()
Expand Down Expand Up @@ -181,7 +186,7 @@ func Test_StartByID_WhenPrepared(t *testing.T) {
requestBody, err := serialize(newStartByIdMessage())
assert.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/start", bytes.NewBuffer(requestBody))
req, err := http.NewRequest(http.MethodPost, "/dataflows/"+id+"/started", bytes.NewBuffer(requestBody))
assert.NoError(t, err)

rr := httptest.NewRecorder()
Expand All @@ -196,16 +201,21 @@ func Test_StartByID_WhenPrepared(t *testing.T) {
}

func Test_StartByID_MissingSourceAddress(t *testing.T) {
requestBody, err := serialize(dsdk.DataFlowStartByIdMessage{})
id := uuid.New().String()
store := postgres.NewStore(database)
flow, err := newFlowBuilder().ID(id).State(dsdk.Prepared).Consumer(true).Build()
assert.NoError(t, err)
assert.NoError(t, store.Create(ctx, flow))
requestBody, err := serialize(dsdk.DataFlowStartedNotificationMessage{})
assert.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/dataflows/some-id/start", bytes.NewBuffer(requestBody))
req, err := http.NewRequest(http.MethodPost, "/dataflows/"+flow.ID+"/started", bytes.NewBuffer(requestBody))
assert.NoError(t, err)

rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

assert.Equal(t, http.StatusBadRequest, rr.Code)
assert.Equal(t, http.StatusOK, rr.Code)
}

func Test_Prepare(t *testing.T) {
Expand Down Expand Up @@ -365,6 +375,51 @@ func Test_Terminate_WhenNotFound(t *testing.T) {
assert.Equal(t, http.StatusNotFound, rr.Code)
}

func Test_Complete(t *testing.T) {
id := uuid.New().String()
flow, err := newFlowBuilder().ID(id).State(dsdk.Started).Build()
assert.NoError(t, err)
store := postgres.NewStore(database)
err = store.Create(ctx, flow)
assert.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/dataflows/"+flow.ID+"/completed", strings.NewReader(""))
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

assert.Equal(t, http.StatusOK, rr.Code)
byId, err := store.FindById(ctx, id)
assert.NoError(t, err)
assert.Equal(t, dsdk.Completed, byId.State)
}

func Test_Complete_NotFound(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "/dataflows/not-exist/completed", strings.NewReader(""))
assert.NoError(t, err)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

assert.Equal(t, http.StatusNotFound, rr.Code)
}

func Test_Complete_WrongState(t *testing.T) {
id := uuid.New().String()
flow, err := newFlowBuilder().ID(id).State(dsdk.Terminated).Build()
assert.NoError(t, err)
store := postgres.NewStore(database)
err = store.Create(ctx, flow)
assert.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/dataflows/"+flow.ID+"/completed", strings.NewReader(""))
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

assert.Equal(t, http.StatusBadRequest, rr.Code)
byId, err := store.FindById(ctx, id)
assert.NoError(t, err)
assert.NotEqual(t, dsdk.Completed, byId.State)
}

func Test_GetStatus(t *testing.T) {
id := uuid.New().String()
flow, err := newFlowBuilder().ID(id).State(dsdk.Started).Build()
Expand Down Expand Up @@ -412,28 +467,27 @@ func serialize(obj any) ([]byte, error) {
func newStartMessage() dsdk.DataFlowStartMessage {
return dsdk.DataFlowStartMessage{
DataFlowBaseMessage: dsdk.DataFlowBaseMessage{
MessageID: uuid.New().String(),
ParticipantID: uuid.New().String(),
CounterPartyID: uuid.New().String(),
DataspaceContext: uuid.New().String(),
ProcessID: uuid.New().String(),
AgreementID: uuid.New().String(),
DatasetID: uuid.New().String(),
CallbackAddress: newCallback(),
TransferType: newTransferType(),
DestinationDataAddress: dsdk.DataAddress{},
},
SourceDataAddress: &dsdk.DataAddress{
Properties: map[string]any{
"foo": "bar",
MessageID: uuid.New().String(),
ParticipantID: uuid.New().String(),
CounterPartyID: uuid.New().String(),
DataspaceContext: uuid.New().String(),
ProcessID: uuid.New().String(),
AgreementID: uuid.New().String(),
DatasetID: uuid.New().String(),
CallbackAddress: newCallback(),
TransferType: newTransferType(),
DataAddress: &dsdk.DataAddress{
Properties: map[string]any{
"foo": "bar",
},
},
},
}
}

func newStartByIdMessage() dsdk.DataFlowStartByIdMessage {
return dsdk.DataFlowStartByIdMessage{
SourceDataAddress: &dsdk.DataAddress{
func newStartByIdMessage() dsdk.DataFlowStartedNotificationMessage {
return dsdk.DataFlowStartedNotificationMessage{
DataAddress: &dsdk.DataAddress{
Properties: map[string]any{
"foo": "bar",
},
Expand All @@ -451,16 +505,16 @@ func newTransferType() dsdk.TransferType {
func newPrepareMessage() dsdk.DataFlowPrepareMessage {
return dsdk.DataFlowPrepareMessage{
DataFlowBaseMessage: dsdk.DataFlowBaseMessage{
MessageID: uuid.New().String(),
ParticipantID: uuid.New().String(),
CounterPartyID: uuid.New().String(),
DataspaceContext: uuid.New().String(),
ProcessID: uuid.New().String(),
AgreementID: uuid.New().String(),
DatasetID: uuid.New().String(),
CallbackAddress: newCallback(),
TransferType: newTransferType(),
DestinationDataAddress: dsdk.DataAddress{},
MessageID: uuid.New().String(),
ParticipantID: uuid.New().String(),
CounterPartyID: uuid.New().String(),
DataspaceContext: uuid.New().String(),
ProcessID: uuid.New().String(),
AgreementID: uuid.New().String(),
DatasetID: uuid.New().String(),
CallbackAddress: newCallback(),
TransferType: newTransferType(),
DataAddress: &dsdk.DataAddress{},
},
}
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/dsdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (d *DataPlaneApi) StartById(w http.ResponseWriter, r *http.Request, id stri
http.Error(w, "Invalid request method", http.StatusBadRequest)
return
}
var startMessage DataFlowStartByIdMessage
var startMessage DataFlowStartedNotificationMessage

if err := json.NewDecoder(r.Body).Decode(&startMessage); err != nil {
d.decodingError(w, err)
Expand Down Expand Up @@ -218,6 +218,19 @@ func (d *DataPlaneApi) Status(processID string, w http.ResponseWriter, r *http.R
d.writeResponse(w, http.StatusOK, response)
}

func (d *DataPlaneApi) Complete(processID string, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusBadRequest)
return
}
err := d.sdk.Complete(r.Context(), processID)
if err != nil {
d.handleError(err, w)
return
}
d.writeResponse(w, http.StatusOK, nil)
}

func (d *DataPlaneApi) decodingError(w http.ResponseWriter, err error) {
id := uuid.NewString()
d.sdk.Monitor.Printf("Error decoding flow [%s]: %v\n", id, err)
Expand All @@ -228,7 +241,7 @@ func (d *DataPlaneApi) decodingError(w http.ResponseWriter, err error) {
func (d *DataPlaneApi) handleError(err error, w http.ResponseWriter) {

switch {
case errors.Is(err, ErrValidation), errors.Is(err, ErrInvalidTransition):
case errors.Is(err, ErrValidation), errors.Is(err, ErrInvalidTransition), errors.Is(err, ErrInvalidInput):
d.badRequest(err.Error(), w)
case errors.Is(err, ErrNotFound):
d.writeResponse(w, http.StatusNotFound, &DataFlowResponseMessage{Error: err.Error()})
Expand Down
Loading