From 19a4de4cb319a40321c69fdb141db7972f5cc0eb Mon Sep 17 00:00:00 2001 From: Jim Marino Date: Fri, 10 Oct 2025 14:08:41 +0200 Subject: [PATCH] Replace builder with idiomatic go --- .../consumer/dataplane.go | 19 +-- .../provider/dataplane.go | 17 +-- .../consumer/dataplane.go | 23 ++- .../provider/dataplane.go | 21 ++- .../sync-pull-dataplane/consumer/dataplane.go | 19 +-- .../sync-pull-dataplane/provider/dataplane.go | 17 +-- internal/tests/api_integration_test.go | 8 +- pkg/dsdk/dsdk.go | 94 +++++++----- pkg/dsdk/dsdk_options_test.go | 143 ++++++++++++++++++ 9 files changed, 252 insertions(+), 109 deletions(-) create mode 100644 pkg/dsdk/dsdk_options_test.go diff --git a/examples/streaming-pull-dataplane/consumer/dataplane.go b/examples/streaming-pull-dataplane/consumer/dataplane.go index ec9ac6e..b47cc39 100644 --- a/examples/streaming-pull-dataplane/consumer/dataplane.go +++ b/examples/streaming-pull-dataplane/consumer/dataplane.go @@ -33,14 +33,13 @@ type ConsumerDataPlane struct { func NewDataPlane(eventSubscriber *natsservices.EventSubscriber) (*ConsumerDataPlane, error) { dataplane := &ConsumerDataPlane{eventSubscriber: eventSubscriber} - sdk, err := dsdk.NewDataPlaneSDKBuilder(). - Store(memory.NewInMemoryStore()). - TransactionContext(memory.InMemoryTrxContext{}). - OnPrepare(dataplane.prepareProcessor). - OnStart(dataplane.startProcessor). - OnTerminate(dataplane.terminateProcessor). - OnSuspend(dataplane.noopHandler). - Build() + + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(memory.NewInMemoryStore()), + dsdk.WithTransactionContext(memory.InMemoryTrxContext{}), + dsdk.WithPrepareProcessor(dataplane.prepareProcessor), + dsdk.WithStartProcessor(dataplane.startProcessor), + ) if err != nil { return nil, err } @@ -114,10 +113,6 @@ func (d *ConsumerDataPlane) suspendProcessor(_ context.Context, flow *dsdk.DataF return nil } -func (d *ConsumerDataPlane) noopHandler(context.Context, *dsdk.DataFlow) error { - return nil -} - func parseToken(keyValue string, da *dsdk.DataAddress) (string, bool) { rawProps, ok := da.Properties[dsdk.EndpointProperties].([]any) if !ok { diff --git a/examples/streaming-pull-dataplane/provider/dataplane.go b/examples/streaming-pull-dataplane/provider/dataplane.go index ddd4345..672ac62 100644 --- a/examples/streaming-pull-dataplane/provider/dataplane.go +++ b/examples/streaming-pull-dataplane/provider/dataplane.go @@ -39,15 +39,14 @@ func NewDataPlane(authService *natsservices.AuthService, publisherService *EventPublisherService) (*ProviderDataPlane, error) { providerDataPlane := &ProviderDataPlane{authService: authService, connectionInvalidator: invalidator, publisherService: publisherService} - builder := dsdk.NewDataPlaneSDKBuilder() - store := memory.NewInMemoryStore() - sdk, err := builder.Store(store). - TransactionContext(memory.InMemoryTrxContext{}). - OnPrepare(providerDataPlane.prepareProcessor). - OnStart(providerDataPlane.startProcessor). - OnSuspend(providerDataPlane.suspendProcessor). - OnTerminate(providerDataPlane.terminateProcessor). - Build() + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(memory.NewInMemoryStore()), + dsdk.WithTransactionContext(memory.InMemoryTrxContext{}), + dsdk.WithPrepareProcessor(providerDataPlane.prepareProcessor), + dsdk.WithStartProcessor(providerDataPlane.startProcessor), + dsdk.WithSuspendProcessor(providerDataPlane.suspendProcessor), + dsdk.WithTerminateProcessor(providerDataPlane.terminateProcessor), + ) if err != nil { return nil, err } diff --git a/examples/streaming-push-dataplane/consumer/dataplane.go b/examples/streaming-push-dataplane/consumer/dataplane.go index 533c330..dadb08c 100644 --- a/examples/streaming-push-dataplane/consumer/dataplane.go +++ b/examples/streaming-push-dataplane/consumer/dataplane.go @@ -40,28 +40,27 @@ func NewDataPlane(authService *natsservices.AuthService, natsUrl string, eventSubscriber *natsservices.EventSubscriber) (*ConsumerDataPlane, error) { - providerDataPlane := &ConsumerDataPlane{ + dataPlane := &ConsumerDataPlane{ authService: authService, connectionInvalidator: invalidator, natsUrl: natsUrl, eventSubscriber: eventSubscriber} - builder := dsdk.NewDataPlaneSDKBuilder() - store := memory.NewInMemoryStore() - sdk, err := builder.Store(store). - TransactionContext(memory.InMemoryTrxContext{}). - OnPrepare(providerDataPlane.prepareProcessor). - OnStart(providerDataPlane.startProcessor). - OnSuspend(providerDataPlane.suspendProcessor). - OnTerminate(providerDataPlane.terminateProcessor). - Build() + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(memory.NewInMemoryStore()), + dsdk.WithTransactionContext(memory.InMemoryTrxContext{}), + dsdk.WithPrepareProcessor(dataPlane.prepareProcessor), + dsdk.WithStartProcessor(dataPlane.startProcessor), + dsdk.WithSuspendProcessor(dataPlane.suspendProcessor), + dsdk.WithTerminateProcessor(dataPlane.terminateProcessor), + ) if err != nil { return nil, err } - providerDataPlane.api = dsdk.NewDataPlaneApi(sdk) + dataPlane.api = dsdk.NewDataPlaneApi(sdk) - return providerDataPlane, nil + return dataPlane, nil } func (d *ConsumerDataPlane) Init() { diff --git a/examples/streaming-push-dataplane/provider/dataplane.go b/examples/streaming-push-dataplane/provider/dataplane.go index 4f98fcd..d46732b 100644 --- a/examples/streaming-push-dataplane/provider/dataplane.go +++ b/examples/streaming-push-dataplane/provider/dataplane.go @@ -33,14 +33,15 @@ type ProviderDataPlane struct { func NewDataPlane(publisherService *EventPublisherService) (*ProviderDataPlane, error) { dataplane := &ProviderDataPlane{publisherService: publisherService} - sdk, err := dsdk.NewDataPlaneSDKBuilder(). - Store(memory.NewInMemoryStore()). - TransactionContext(memory.InMemoryTrxContext{}). - OnPrepare(dataplane.prepareProcessor). - OnStart(dataplane.startProcessor). - OnTerminate(dataplane.terminateProcessor). - OnSuspend(dataplane.noopHandler). - Build() + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(memory.NewInMemoryStore()), + dsdk.WithTransactionContext(memory.InMemoryTrxContext{}), + dsdk.WithPrepareProcessor(dataplane.prepareProcessor), + dsdk.WithStartProcessor(dataplane.startProcessor), + dsdk.WithSuspendProcessor(dataplane.suspendProcessor), + dsdk.WithTerminateProcessor(dataplane.terminateProcessor), + ) + if err != nil { return nil, err } @@ -112,10 +113,6 @@ func (d *ProviderDataPlane) suspendProcessor(_ context.Context, flow *dsdk.DataF return nil } -func (d *ProviderDataPlane) noopHandler(context.Context, *dsdk.DataFlow) error { - return nil -} - func parseToken(keyValue string, da *dsdk.DataAddress) (string, bool) { rawProps, ok := da.Properties[dsdk.EndpointProperties].([]any) if !ok { diff --git a/examples/sync-pull-dataplane/consumer/dataplane.go b/examples/sync-pull-dataplane/consumer/dataplane.go index 1f1a7ca..e1264a4 100644 --- a/examples/sync-pull-dataplane/consumer/dataplane.go +++ b/examples/sync-pull-dataplane/consumer/dataplane.go @@ -47,14 +47,13 @@ type ConsumerDataPlane struct { func NewDataPlane() (*ConsumerDataPlane, error) { dataplane := &ConsumerDataPlane{tokenStore: common.NewStore[tokenEntry]()} - sdk, err := dsdk.NewDataPlaneSDKBuilder(). - Store(memory.NewInMemoryStore()). - TransactionContext(memory.InMemoryTrxContext{}). - OnPrepare(dataplane.prepareProcessor). - OnStart(dataplane.startProcessor). - OnTerminate(dataplane.noopHandler). - OnSuspend(dataplane.noopHandler). - Build() + + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(memory.NewInMemoryStore()), + dsdk.WithTransactionContext(memory.InMemoryTrxContext{}), + dsdk.WithPrepareProcessor(dataplane.prepareProcessor), + dsdk.WithStartProcessor(dataplane.startProcessor), + ) if err != nil { return nil, err } @@ -109,10 +108,6 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context, return &dsdk.DataFlowResponseMessage{State: dsdk.Started}, nil } -func (d *ConsumerDataPlane) noopHandler(context.Context, *dsdk.DataFlow) error { - return nil -} - func (d *ConsumerDataPlane) getEndpointToken(w http.ResponseWriter, r *http.Request) { // Check if it's a GET request if r.Method != http.MethodGet { diff --git a/examples/sync-pull-dataplane/provider/dataplane.go b/examples/sync-pull-dataplane/provider/dataplane.go index e99602e..cb95c6c 100644 --- a/examples/sync-pull-dataplane/provider/dataplane.go +++ b/examples/sync-pull-dataplane/provider/dataplane.go @@ -46,15 +46,14 @@ func NewDataPlane() (*ProviderDataPlane, error) { tokenStore: common.NewStore[tokenEntry](), } - builder := dsdk.NewDataPlaneSDKBuilder() - store := memory.NewInMemoryStore() - sdk, err := builder.Store(store). - TransactionContext(memory.InMemoryTrxContext{}). - OnPrepare(providerDataPlane.prepareProcessor). - OnStart(providerDataPlane.startProcessor). - OnSuspend(providerDataPlane.suspendProcessor). - OnTerminate(providerDataPlane.terminateProcessor). - Build() + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(memory.NewInMemoryStore()), + dsdk.WithTransactionContext(memory.InMemoryTrxContext{}), + dsdk.WithPrepareProcessor(providerDataPlane.prepareProcessor), + dsdk.WithStartProcessor(providerDataPlane.startProcessor), + dsdk.WithSuspendProcessor(providerDataPlane.suspendProcessor), + dsdk.WithTerminateProcessor(providerDataPlane.terminateProcessor), + ) if err != nil { return nil, err } diff --git a/internal/tests/api_integration_test.go b/internal/tests/api_integration_test.go index 2339e2f..6194940 100644 --- a/internal/tests/api_integration_test.go +++ b/internal/tests/api_integration_test.go @@ -470,9 +470,9 @@ func newCallback() dsdk.CallbackURL { } func newSdk(db *sql.DB) (*dsdk.DataPlaneSDK, error) { - sdk, err := dsdk.NewDataPlaneSDKBuilder(). - Store(postgres.NewStore(db)). - TransactionContext(postgres.NewDBTransactionContext(db)). - Build() + sdk, err := dsdk.NewDataPlaneSDK( + dsdk.WithStore(postgres.NewStore(db)), + dsdk.WithTransactionContext(postgres.NewDBTransactionContext(db)), + ) return sdk, err } diff --git a/pkg/dsdk/dsdk.go b/pkg/dsdk/dsdk.go index 0ee2398..7b86ac2 100644 --- a/pkg/dsdk/dsdk.go +++ b/pkg/dsdk/dsdk.go @@ -325,55 +325,73 @@ func (dsdk *DataPlaneSDK) execute(ctx context.Context, callback func(ctx2 contex } } -type DataPlaneSDKBuilder struct { - sdk *DataPlaneSDK -} +// DataPlaneSDKOption configures a DataPlaneSDK instance +type DataPlaneSDKOption func(*DataPlaneSDK) -func NewDataPlaneSDKBuilder() *DataPlaneSDKBuilder { - return &DataPlaneSDKBuilder{ - sdk: &DataPlaneSDK{}, +func WithStore(store DataplaneStore) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.Store = store } } -func (b *DataPlaneSDKBuilder) Store(store DataplaneStore) *DataPlaneSDKBuilder { - b.sdk.Store = store - return b +func WithTransactionContext(trxContext TransactionContext) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.TrxContext = trxContext + } } -func (b *DataPlaneSDKBuilder) TransactionContext(trxContext TransactionContext) *DataPlaneSDKBuilder { - b.sdk.TrxContext = trxContext - return b +func WithMonitor(monitor LogMonitor) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.Monitor = monitor + } } -func (b *DataPlaneSDKBuilder) OnPrepare(processor DataFlowProcessor) *DataPlaneSDKBuilder { - b.sdk.onPrepare = processor - return b +func WithPrepareProcessor(processor DataFlowProcessor) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.onPrepare = processor + } } -func (b *DataPlaneSDKBuilder) OnStart(processor DataFlowProcessor) *DataPlaneSDKBuilder { - b.sdk.onStart = processor - return b +func WithStartProcessor(processor DataFlowProcessor) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.onStart = processor + } } -func (b *DataPlaneSDKBuilder) OnTerminate(handler DataFlowHandler) *DataPlaneSDKBuilder { - b.sdk.onTerminate = handler - return b +func WithTerminateProcessor(handler DataFlowHandler) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.onTerminate = handler + } } -func (b *DataPlaneSDKBuilder) OnSuspend(handler DataFlowHandler) *DataPlaneSDKBuilder { - b.sdk.onSuspend = handler - return b +func WithSuspendProcessor(handler DataFlowHandler) DataPlaneSDKOption { + return func(sdk *DataPlaneSDK) { + sdk.onSuspend = handler + } } -func (b *DataPlaneSDKBuilder) Build() (*DataPlaneSDK, error) { - if b.sdk.Store == nil { +func NewDataPlaneSDK(options ...DataPlaneSDKOption) (*DataPlaneSDK, error) { + sdk := &DataPlaneSDK{} + + // Apply all options + for _, opt := range options { + opt(sdk) + } + + // Validate required fields + if sdk.Store == nil { return nil, errors.New("store is required") } - if b.sdk.TrxContext == nil { + if sdk.TrxContext == nil { return nil, errors.New("transaction context is required") } - if b.sdk.onPrepare == nil { - b.sdk.onPrepare = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) { + + // Set defaults for optional fields + if sdk.Monitor == nil { + sdk.Monitor = defaultLogMonitor{} + } + if sdk.onPrepare == nil { + sdk.onPrepare = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) { return &DataFlowResponseMessage{ DataplaneID: "TODO_REPLACE_ME", DataAddress: &flow.DestinationDataAddress, @@ -381,8 +399,8 @@ func (b *DataPlaneSDKBuilder) Build() (*DataPlaneSDK, error) { Error: ""}, nil } } - if b.sdk.onStart == nil { - b.sdk.onStart = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) { + if sdk.onStart == nil { + sdk.onStart = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) { return &DataFlowResponseMessage{ State: Started, DataplaneID: "TODO_REPLACE_ME", @@ -390,20 +408,18 @@ func (b *DataPlaneSDKBuilder) Build() (*DataPlaneSDK, error) { Error: ""}, nil } } - if b.sdk.onTerminate == nil { - b.sdk.onTerminate = func(context context.Context, flow *DataFlow) error { + if sdk.onTerminate == nil { + sdk.onTerminate = func(context context.Context, flow *DataFlow) error { return nil } } - if b.sdk.onSuspend == nil { - b.sdk.onSuspend = func(context context.Context, flow *DataFlow) error { + if sdk.onSuspend == nil { + sdk.onSuspend = func(context context.Context, flow *DataFlow) error { return nil } } - if b.sdk.Monitor == nil { - b.sdk.Monitor = defaultLogMonitor{} - } - return b.sdk, nil + + return sdk, nil } type defaultLogMonitor struct { diff --git a/pkg/dsdk/dsdk_options_test.go b/pkg/dsdk/dsdk_options_test.go new file mode 100644 index 0000000..1b3d17b --- /dev/null +++ b/pkg/dsdk/dsdk_options_test.go @@ -0,0 +1,143 @@ +// Copyright (c) 2025 Metaform Systems, Inc +// +// This program and the accompanying materials are made available under the +// terms of the Apache License, Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 +// +// Contributors: +// Metaform Systems, Inc. - initial API and implementation +// + +package dsdk + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_WithStore(t *testing.T) { + store := NewMockDataplaneStore(t) + sdk := &DataPlaneSDK{} + + option := WithStore(store) + option(sdk) + + assert.Equal(t, store, sdk.Store) +} + +func Test_WithTransactionContext(t *testing.T) { + trxContext := &mockTrxContext{} + sdk := &DataPlaneSDK{} + + option := WithTransactionContext(trxContext) + option(sdk) + + assert.Equal(t, trxContext, sdk.TrxContext) +} + +func Test_WithMonitor(t *testing.T) { + monitor := &defaultLogMonitor{} + sdk := &DataPlaneSDK{} + + option := WithMonitor(monitor) + option(sdk) + + assert.Equal(t, monitor, sdk.Monitor) +} + +func Test_WithPrepareProcessor(t *testing.T) { + processor := func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) { + return &DataFlowResponseMessage{State: Prepared}, nil + } + sdk := &DataPlaneSDK{} + + option := WithPrepareProcessor(processor) + option(sdk) + + require.NotNil(t, sdk.onPrepare) +} + +func Test_WithStartProcessor(t *testing.T) { + processor := func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) { + return &DataFlowResponseMessage{State: Started}, nil + } + sdk := &DataPlaneSDK{} + + option := WithStartProcessor(processor) + option(sdk) + + require.NotNil(t, sdk.onStart) +} + +func Test_WithTerminateProcessor(t *testing.T) { + handler := func(context.Context, *DataFlow) error { + return nil + } + sdk := &DataPlaneSDK{} + + option := WithTerminateProcessor(handler) + option(sdk) + + require.NotNil(t, sdk.onTerminate) +} + +func Test_WithSuspendProcessor(t *testing.T) { + handler := func(context.Context, *DataFlow) error { + return nil + } + sdk := &DataPlaneSDK{} + + option := WithSuspendProcessor(handler) + option(sdk) + + require.NotNil(t, sdk.onSuspend) +} + +func Test_NewDataPlaneSDK_WithoutOptionalFields(t *testing.T) { + store := NewMockDataplaneStore(t) + trxContext := &mockTrxContext{} + + sdk, err := NewDataPlaneSDK( + WithStore(store), + WithTransactionContext(trxContext), + ) + + require.NoError(t, err) + require.NotNil(t, sdk) + assert.Equal(t, store, sdk.Store) + assert.Equal(t, trxContext, sdk.TrxContext) + assert.IsType(t, defaultLogMonitor{}, sdk.Monitor) + assert.NotNil(t, sdk.onPrepare) + assert.NotNil(t, sdk.onStart) + assert.NotNil(t, sdk.onTerminate) + assert.NotNil(t, sdk.onSuspend) +} + +func Test_NewDataPlaneSDK_MissingStore(t *testing.T) { + trxContext := &mockTrxContext{} + + sdk, err := NewDataPlaneSDK( + WithTransactionContext(trxContext), + ) + + require.Error(t, err) + assert.Nil(t, sdk) + assert.Contains(t, err.Error(), "store is required") +} + +func Test_NewDataPlaneSDK_MissingTransactionContext(t *testing.T) { + store := NewMockDataplaneStore(t) + + sdk, err := NewDataPlaneSDK( + WithStore(store), + ) + + require.Error(t, err) + assert.Nil(t, sdk) + assert.Contains(t, err.Error(), "transaction context is required") +}