Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/metaform/dataplane-sdk-go
go 1.24.1

require (
github.com/go-playground/validator/v10 v10.27.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.5.4
github.com/lib/pq v1.10.9
github.com/nats-io/jwt/v2 v2.7.4
github.com/nats-io/nats-server/v2 v2.11.6
Expand Down Expand Up @@ -32,14 +32,16 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/ebitengine/purego v0.8.4 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.10 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
Expand Down
19 changes: 12 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,23 @@ github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0o
github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
Expand All @@ -58,12 +68,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8=
github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
Expand All @@ -72,6 +76,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
Expand Down Expand Up @@ -132,7 +138,6 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
Expand Down
2 changes: 1 addition & 1 deletion internal/tests/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func Test_Start_NotExists(t *testing.T) {

func Test_Start_InvalidPayload(t *testing.T) {
sm := newStartMessage()
sm.CounterPartyID = ""
sm.CounterPartyID = "" // should raise a validation error
payload, err := serialize(sm)
assert.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, "/start", bytes.NewBuffer(payload))
Expand Down
65 changes: 50 additions & 15 deletions pkg/dsdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ func (d *DataPlaneApi) Prepare(w http.ResponseWriter, r *http.Request) {
var prepareMessage DataFlowPrepareMessage

if err := json.NewDecoder(r.Body).Decode(&prepareMessage); err != nil {
d.decodeError(w, err)
d.decodingError(w, err)
return
}

if err := prepareMessage.Validate(); err != nil {
d.validationError(err, w)
}

response, err := d.sdk.Prepare(r.Context(), prepareMessage)
if err != nil {
d.processError(err, w)
d.otherError(err, w)
return
}

Expand All @@ -69,13 +73,18 @@ func (d *DataPlaneApi) Start(w http.ResponseWriter, r *http.Request) {
var startMessage DataFlowStartMessage

if err := json.NewDecoder(r.Body).Decode(&startMessage); err != nil {
d.decodeError(w, err)
d.decodingError(w, err)
return
}

if err := startMessage.Validate(); err != nil {
d.validationError(err, w)
return
}

response, err := d.sdk.Start(r.Context(), startMessage)
if err != nil {
d.processError(err, w)
d.otherError(err, w)
return
}

Expand All @@ -90,13 +99,35 @@ func (d *DataPlaneApi) Start(w http.ResponseWriter, r *http.Request) {
}

func (d *DataPlaneApi) Terminate(w http.ResponseWriter, r *http.Request) {
var terminateMessage DataFlowTransitionMessage

if err := json.NewDecoder(r.Body).Decode(&terminateMessage); err != nil {
d.decodingError(w, err)
return
}
if err := terminateMessage.Validate(); err != nil {
d.validationError(err, w)
return
}
d.transition(w, r, func(processID string) error {
//todo: pass Reason to Terminate
return d.sdk.Terminate(r.Context(), processID)
})
}

func (d *DataPlaneApi) Suspend(w http.ResponseWriter, r *http.Request) {
var suspendMessage DataFlowTransitionMessage

if err := json.NewDecoder(r.Body).Decode(&suspendMessage); err != nil {
d.decodingError(w, err)
return
}
if err := suspendMessage.Validate(); err != nil {
d.validationError(err, w)
return
}
d.transition(w, r, func(processID string) error {
//todo: pass Reason to Suspend
return d.sdk.Suspend(r.Context(), processID)
})
}
Expand All @@ -112,7 +143,7 @@ func (d *DataPlaneApi) Status(w http.ResponseWriter, r *http.Request) {
}
dataFlow, err := d.sdk.Status(r.Context(), processID)
if err != nil {
d.processError(err, w)
d.otherError(err, w)
return
}
w.Header().Set(contentType, jsonContentType)
Expand All @@ -131,40 +162,37 @@ func (d *DataPlaneApi) transition(w http.ResponseWriter, r *http.Request, transi

processID, err := ParseIDFromURL(r.URL)
if err != nil {
d.processError(err, w)
d.otherError(err, w)
return
}

var terminateMessage DataFlowTransitionMessage

if err := json.NewDecoder(r.Body).Decode(&terminateMessage); err != nil {
d.decodeError(w, err)
d.decodingError(w, err)
return
}

err = transition(processID)
if err != nil {
d.processError(err, w)
d.otherError(err, w)
return
}

w.Header().Set(contentType, jsonContentType)
w.WriteHeader(http.StatusOK)
}

func (d *DataPlaneApi) decodeError(w http.ResponseWriter, err error) {
func (d *DataPlaneApi) decodingError(w http.ResponseWriter, err error) {
id := uuid.NewString()
d.sdk.Monitor.Printf("Error decoding flow [%s]: %v\n", id, err)
d.writeResponse(w, http.StatusBadRequest, &DataFlowResponseMessage{Error: fmt.Sprintf("Failed to decode request body [%s]", id)})
}

// processError writes an error message to the HTTP response that indicates a processing error (= HTTP 500, HTTP 400 if validation error)
func (d *DataPlaneApi) processError(err error, w http.ResponseWriter) {
// otherError writes an error message to the HTTP response that indicates "any other" error, such as 409, 500, etc.
func (d *DataPlaneApi) otherError(err error, w http.ResponseWriter) {

switch {
case errors.Is(err, ErrValidation):
message := fmt.Sprintf("Validation error: %s", err)
d.writeResponse(w, http.StatusBadRequest, &DataFlowResponseMessage{Error: message})
case errors.Is(err, ErrConflict):
message := fmt.Sprintf("%s", err)
d.writeResponse(w, http.StatusConflict, &DataFlowResponseMessage{Error: message})
Expand All @@ -173,7 +201,14 @@ func (d *DataPlaneApi) processError(err error, w http.ResponseWriter) {
d.sdk.Monitor.Println(message)
d.writeResponse(w, http.StatusInternalServerError, &DataFlowResponseMessage{Error: message})
}

}
func (d *DataPlaneApi) validationError(err error, w http.ResponseWriter) {
if errors.Is(err, ErrValidation) {
message := fmt.Sprintf("Validation error: %s", err)
d.writeResponse(w, http.StatusBadRequest, &DataFlowResponseMessage{Error: message})
} else {
d.otherError(err, w)
}
}

func (d *DataPlaneApi) writeResponse(w http.ResponseWriter, code int, response any) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/dsdk/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ var (
func NewValidationError(messages ...string) error {
return fmt.Errorf("%w: %s", ErrValidation, strings.Join(messages, "; "))
}

func WrapValidationError(err error) error {
return fmt.Errorf("%w: %w", ErrValidation, err)
}
76 changes: 76 additions & 0 deletions pkg/dsdk/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package dsdk

import (
"github.com/go-playground/validator/v10"
)

var v = validator.New()

type DataFlowBaseMessage struct {
MessageID string `json:"messageID" validate:"required"`
ParticipantID string `json:"participantID" validate:"required"`
CounterPartyID string `json:"counterPartyID" validate:"required"`
DataspaceContext string `json:"dataspaceContext" validate:"required"`
ProcessID string `json:"processID" validate:"required"`
AgreementID string `json:"agreementID" validate:"required"`
DatasetID string `json:"datasetID" validate:"required"`
CallbackAddress CallbackURL `json:"callbackAddress" validate:"required,callback-url"`
TransferType TransferType `json:"transferType" validate:"required"`
DestinationDataAddress DataAddress `json:"destinationDataAddress" validate:"required"`
}

func (d *DataFlowBaseMessage) Validate() error {
err := v.RegisterValidation("callback-url", func(fl validator.FieldLevel) bool {
u, ok := fl.Field().Interface().(CallbackURL)
return ok && !u.IsEmpty()
})
if err != nil {
return err
}

if err := v.Struct(d); err != nil {
return WrapValidationError(err)
}
return nil
}

type DataFlowStartMessage struct {
DataFlowBaseMessage
SourceDataAddress *DataAddress `json:"sourceDataAddress,omitempty" validate:"required"`
}

func (d *DataFlowStartMessage) Validate() error {
err := d.DataFlowBaseMessage.Validate() // call base validator
if err != nil {
return WrapValidationError(err)
}
err = v.Struct(d)
if err != nil {
return WrapValidationError(err)
}
return nil
}

type DataFlowPrepareMessage struct {
DataFlowBaseMessage
}

type DataFlowTransitionMessage struct {
Reason string `json:"reason"`
}

func (d *DataFlowTransitionMessage) Validate() error {
return nil // no special behaviour yet
}

type DataFlowResponseMessage struct {
DataplaneID string `json:"dataplaneID"`
DataAddress *DataAddress `json:"dataAddress,omitempty"`
State DataFlowState `json:"state"`
Error string `json:"error"`
}

type DataFlowStatusResponseMessage struct {
State DataFlowState `json:"state"`
DataFlowID string `json:"dataFlowID"`
}
Loading