diff --git a/.github/workflows/common_ci.yml b/.github/workflows/common_ci.yml index de5ff486..a1485ad1 100644 --- a/.github/workflows/common_ci.yml +++ b/.github/workflows/common_ci.yml @@ -66,7 +66,7 @@ jobs: enable_persistence_tests: 'true' test_service_port: ${{ env.TEST_SERVICE_PORT }} token: ${{ secrets.GITHUB_TOKEN }} - version: v3.0.0-alpha.1 + version: v3.0.0-alpha.3 - name: Upload test service logs uses: actions/upload-artifact@v4 diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index d346393e..b0bfcf7a 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -43,8 +43,13 @@ type FDv2 struct { // List of initializers that are capable of obtaining an initial payload of data. initializers []subsystems.DataInitializer - // The primary synchronizer responsible for keeping data up-to-date. - primarySyncBuilder func() (subsystems.DataSynchronizer, error) + // Mutable list of synchronizer builders. Items are removed when they permanently fail. + // When reverting to FDv1, this list is replaced with a single FDv1 synchronizer. + synchronizerBuilders []func() (subsystems.DataSynchronizer, error) + currentSyncIndex int + + // FDv1 fallback builder, used only when a synchronizer requests revert to FDv1 + fdv1FallbackBuilder func() (subsystems.DataSynchronizer, error) // Boolean used to track whether the datasystem was originally configured // with some sort of valid data source. @@ -53,12 +58,6 @@ type FDv2 struct { // they permanently fail. configuredWithDataSources bool - // The secondary synchronizer, in case the primary is unavailable. - secondarySyncBuilder func() (subsystems.DataSynchronizer, error) - - // The fdv1 fallback synchronizer, in case we have to fall back to fdv1. - fdv1SyncBuilder func() (subsystems.DataSynchronizer, error) - // Whether the SDK should make use of persistent store/initializers/synchronizers or not. disabled bool @@ -139,10 +138,11 @@ func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems } fdv2.initializers = cfg.Initializers - fdv2.primarySyncBuilder = cfg.Synchronizers.PrimaryBuilder - fdv2.secondarySyncBuilder = cfg.Synchronizers.SecondaryBuilder - fdv2.fdv1SyncBuilder = cfg.Synchronizers.FDv1FallbackBuilder + fdv2.synchronizerBuilders = cfg.Synchronizers.SynchronizerBuilders + fdv2.currentSyncIndex = 0 + fdv2.fdv1FallbackBuilder = cfg.Synchronizers.FDv1FallbackBuilder fdv2.disabled = disabled + fdv2.fallbackCond = func(status interfaces.DataSourceStatus) bool { interruptedAtRuntime := status.State == interfaces.DataSourceStateInterrupted && time.Since(status.StateSince) > 1*time.Minute @@ -162,7 +162,7 @@ func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems return interruptedAtRuntime || healthyForTooLong || cannotInitialize } - fdv2.configuredWithDataSources = len(fdv2.initializers) > 0 || fdv2.primarySyncBuilder != nil + fdv2.configuredWithDataSources = len(fdv2.initializers) > 0 || len(fdv2.synchronizerBuilders) > 0 if cfg.Store != nil && !disabled { // If there's a persistent Store, we should provide a status monitor and inform Store that it's present. @@ -263,9 +263,8 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} } func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) { - // If the SDK was configured with no synchronizer, then (assuming no initializer succeeded, which would have - // already closed the channel), we should close it now so that MakeClient unblocks. - if f.primarySyncBuilder == nil { + // If no synchronizers configured, close ready channel and return + if len(f.synchronizerBuilders) == 0 { f.readyOnce.Do(func() { close(closeWhenReady) }) @@ -279,82 +278,73 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{ }) for { - primarySync, err := f.primarySyncBuilder() - if err != nil { - f.loggers.Errorf("Failed to build the primary synchronizer: %v", err) - return - } - - f.loggers.Debugf("Primary synchronizer %s is starting", primarySync.Name()) - resultChan := primarySync.Sync(f.store) - removeSync, fallbackv1, err := f.consumeSynchronizerResults(ctx, resultChan, f.fallbackCond, closeWhenReady) - - if err := primarySync.Close(); err != nil { - f.loggers.Errorf("Primary synchronizer %s failed to gracefully close: %v", primarySync.Name(), err) - } - if errors.Is(err, context.Canceled) { + // Check if we've run out of synchronizers + if len(f.synchronizerBuilders) == 0 { + f.loggers.Warn("No more synchronizers available") + f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError) return } - if removeSync { - f.primarySyncBuilder = f.secondarySyncBuilder - f.secondarySyncBuilder = nil - - if fallbackv1 { - f.primarySyncBuilder = f.fdv1SyncBuilder - } - - if f.primarySyncBuilder == nil { - f.loggers.Debugf("No more synchronizers available, closing the channel") - f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError) - f.readyOnce.Do(func() { - close(closeWhenReady) - }) - return - } - } else { - f.loggers.Debugf("Fallback condition met") - } - - if f.secondarySyncBuilder == nil { - continue + // Ensure currentSyncIndex is within bounds (shouldn't happen with proper logic) + if f.currentSyncIndex >= len(f.synchronizerBuilders) { + f.currentSyncIndex = 0 } - secondarySync, err := f.secondarySyncBuilder() + // Build synchronizer + sync, err := f.synchronizerBuilders[f.currentSyncIndex]() if err != nil { - f.loggers.Errorf("Failed to build the secondary synchronizer: %v", err) - return + f.loggers.Errorf("Failed to build synchronizer at index %d: %v", f.currentSyncIndex, err) + // Remove the failed builder from the list + f.synchronizerBuilders = append( + f.synchronizerBuilders[:f.currentSyncIndex], + f.synchronizerBuilders[f.currentSyncIndex+1:]...) + // Don't increment currentSyncIndex - it now points to the next synchronizer + continue } - f.loggers.Debugf("Secondary synchronizer %s is starting", secondarySync.Name()) - resultChan = secondarySync.Sync(f.store) - removeSync, fallbackv1, err = f.consumeSynchronizerResults(ctx, resultChan, f.recoveryCond, closeWhenReady) + f.loggers.Infof("Synchronizer at index %d (%s) is starting", f.currentSyncIndex, sync.Name()) + resultChan := sync.Sync(f.store) + action, err := f.consumeSynchronizerResults(ctx, resultChan, closeWhenReady) - if err := secondarySync.Close(); err != nil { - f.loggers.Errorf("Secondary synchronizer %s failed to gracefully close: %v", secondarySync.Name(), err) + if err := sync.Close(); err != nil { + f.loggers.Errorf("Synchronizer %s failed to close: %v", sync.Name(), err) } + if errors.Is(err, context.Canceled) { return } - if removeSync { - f.secondarySyncBuilder = nil - - if fallbackv1 { - f.primarySyncBuilder = f.fdv1SyncBuilder - - if f.primarySyncBuilder == nil { - f.loggers.Debugf("No more synchronizers available, closing the channel") - f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError) - f.readyOnce.Do(func() { - close(closeWhenReady) - }) - return - } + // Handle action based on conditions + switch action { + case syncFDv1: + if f.fdv1FallbackBuilder != nil { + f.loggers.Warn("Reverting to FDv1 protocol") + // Replace entire list with single FDv1 synchronizer + f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} + f.currentSyncIndex = 0 + continue } + f.loggers.Warn("Synchronizer requested FDv1 fallback but none configured") + f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError) + return + case syncRemove: + f.loggers.Warnf("Permanently removing synchronizer at index %d", f.currentSyncIndex) + f.synchronizerBuilders = append( + f.synchronizerBuilders[:f.currentSyncIndex], + f.synchronizerBuilders[f.currentSyncIndex+1:]...) + // Don't increment currentSyncIndex - it now points to the next synchronizer + continue + case syncRecover: + // Recovery: jump back to index 0 + f.loggers.Info("Recovery condition met, returning to first synchronizer") + f.currentSyncIndex = 0 + case syncFallback: + // Fallback: move to next index + f.loggers.Info("Fallback condition met, trying next synchronizer") + f.currentSyncIndex++ } - f.loggers.Debugf("Recovery condition met") + // Check for cancellation before next iteration select { case <-ctx.Done(): return @@ -364,25 +354,33 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{ }) } +type syncAction int + +const ( + syncFallback syncAction = iota + syncRecover + syncRemove + syncFDv1 +) + func (f *FDv2) consumeSynchronizerResults( ctx context.Context, resultChan <-chan subsystems.DataSynchronizerResult, - cond func(status interfaces.DataSourceStatus) bool, closeWhenReady chan<- struct{}, -) (bool, bool, error) { +) (action syncAction, err error) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): - return false, false, ctx.Err() + return syncFallback, ctx.Err() case result, ok := <-resultChan: // The status channel being closed means that we won't be receiving // any more information from that synchronizer and we should // probably fall back. if !ok { - return false, false, nil + return syncFallback, nil } if result.EnvironmentID.IsDefined() { @@ -404,15 +402,33 @@ func (f *FDv2) consumeSynchronizerResults( f.UpdateStatus(result.State, result.Error) case interfaces.DataSourceStateOff: f.UpdateStatus(interfaces.DataSourceStateInterrupted, result.Error) - return true, result.RevertToFDv1, nil + if result.RevertToFDv1 { + return syncFDv1, nil + } + return syncRemove, nil } case <-ticker.C: + // If there's only one synchronizer, don't check conditions + if len(f.synchronizerBuilders) == 1 { + continue + } + status := f.getStatus() f.loggers.Debugf("Data source status used to evaluate condition: %s", status.String()) - if cond(status) { - return false, false, nil + + // Check fallback condition first (things are bad) + if f.fallbackCond(status) { + f.loggers.Debugf("Fallback condition met") + return syncFallback, nil + } + + // If not at index 0, also check recovery condition (things are good) + if f.currentSyncIndex > 0 && f.recoveryCond(status) { + f.loggers.Debugf("Recovery condition met") + return syncRecover, nil } - f.loggers.Debugf("Condition check succeeded, continue with current synchronizer") + + f.loggers.Debugf("No condition met, continue with current synchronizer") } } } diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index b8eac753..e980b78f 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -200,7 +200,12 @@ func TestFDV2ShutdownDownIfBothSynchronizersFail(t *testing.T) { expectedStreamError := "Error in stream connection (giving up permanently): HTTP error 401 (invalid SDK key)" expectedPollError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)" assert.Equal(t, []string{expectedStreamError, expectedPollError}, logCapture.GetOutput(ldlog.Error)) - assert.Equal(t, []string{initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn)) + assert.Equal(t, []string{ + "Permanently removing synchronizer at index 0", + "Permanently removing synchronizer at index 0", + "No more synchronizers available", + initializationFailedErrorMessage, + }, logCapture.GetOutput(ldlog.Warn)) }) } @@ -280,7 +285,11 @@ func TestFDV2PollingSynchronizerFailsToStartWith401Error(t *testing.T) { expectedError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)" assert.Equal(t, []string{expectedError}, logCapture.GetOutput(ldlog.Error)) - assert.Equal(t, []string{initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn)) + assert.Equal(t, []string{ + "Permanently removing synchronizer at index 0", + "No more synchronizers available", + initializationFailedErrorMessage, + }, logCapture.GetOutput(ldlog.Warn)) }) } @@ -383,7 +392,6 @@ func TestFDV2FileInitializerWillDeferToFirstSynchronizer(t *testing.T) { ). Synchronizers( ldcomponents.StreamingDataSourceV2().BaseURI(server.URL), - nil, ), } diff --git a/ldcomponents/data_system_configuration_builder.go b/ldcomponents/data_system_configuration_builder.go index fb6ec8d2..d1fa9d85 100644 --- a/ldcomponents/data_system_configuration_builder.go +++ b/ldcomponents/data_system_configuration_builder.go @@ -1,7 +1,6 @@ package ldcomponents import ( - "errors" "fmt" ss "github.com/launchdarkly/go-server-sdk/v7/subsystems" @@ -12,8 +11,7 @@ type DataSystemConfigurationBuilder struct { storeBuilder ss.ComponentConfigurer[ss.DataStore] storeMode ss.DataStoreMode initializerBuilders []ss.ComponentConfigurer[ss.DataInitializer] - primarySyncBuilder ss.ComponentConfigurer[ss.DataSynchronizer] - secondarySyncBuilder ss.ComponentConfigurer[ss.DataSynchronizer] + synchronizerBuilders []ss.ComponentConfigurer[ss.DataSynchronizer] fdv1FallbackBuilder ss.ComponentConfigurer[ss.DataSynchronizer] config ss.DataSystemConfiguration } @@ -73,7 +71,7 @@ func (d *DataSystemModes) Streaming() *DataSystemConfigurationBuilder { if d.endpoints.Polling != "" { fallback.BaseURI(d.endpoints.Polling) } - return d.Custom().Synchronizers(streaming, nil).FDv1CompatibleSynchronizer(fallback) + return d.Custom().Synchronizers(streaming).FDv1CompatibleSynchronizer(fallback) } // Polling configures the SDK to regularly poll an endpoint for flag/segment data in the background. @@ -85,7 +83,7 @@ func (d *DataSystemModes) Polling() *DataSystemConfigurationBuilder { polling.BaseURI(d.endpoints.Polling) fallback.BaseURI(d.endpoints.Polling) } - return d.Custom().Synchronizers(polling, nil).FDv1CompatibleSynchronizer(fallback) + return d.Custom().Synchronizers(polling).FDv1CompatibleSynchronizer(fallback) } // Daemon configures the SDK to read from a persistent store integration that is populated by Relay Proxy @@ -143,7 +141,8 @@ func DataSystem() *DataSystemModes { // DataStore configures the SDK with an optional data store. The store allows the SDK to serve flag // values before becoming connected to LaunchDarkly. func (d *DataSystemConfigurationBuilder) DataStore(store ss.ComponentConfigurer[ss.DataStore], - storeMode ss.DataStoreMode) *DataSystemConfigurationBuilder { + storeMode ss.DataStoreMode, +) *DataSystemConfigurationBuilder { d.storeBuilder = store d.storeMode = storeMode return d @@ -153,25 +152,27 @@ func (d *DataSystemConfigurationBuilder) DataStore(store ss.ComponentConfigurer[ // complete payloads of flag data. The SDK will run the initializers in the order they are specified, // stopping when one successfully returns data. func (d *DataSystemConfigurationBuilder) Initializers( - initializers ...ss.ComponentConfigurer[ss.DataInitializer]) *DataSystemConfigurationBuilder { + initializers ...ss.ComponentConfigurer[ss.DataInitializer], +) *DataSystemConfigurationBuilder { d.initializerBuilders = initializers return d } -// Synchronizers configures the SDK with a primary and secondary synchronizer. The primary is responsible -// for keeping the SDK's data up-to-date, and the SDK will fall back to the secondary in case of a -// primary outage. -func (d *DataSystemConfigurationBuilder) Synchronizers(primary, - secondary ss.ComponentConfigurer[ss.DataSynchronizer]) *DataSystemConfigurationBuilder { - d.primarySyncBuilder = primary - d.secondarySyncBuilder = secondary +// Synchronizers configures the SDK with an ordered list of synchronizers. +// The SDK tries them in order, falling back to the next synchronizer if one fails. +// When a synchronizer fails and recovery conditions are met, the SDK returns to the first synchronizer. +func (d *DataSystemConfigurationBuilder) Synchronizers( + synchronizers ...ss.ComponentConfigurer[ss.DataSynchronizer], +) *DataSystemConfigurationBuilder { + d.synchronizerBuilders = synchronizers return d } // FDv1CompatibleSynchronizer configures the SDK with a fallback synchronizer that is compatible // with the Flag Delivery v1 API. func (d *DataSystemConfigurationBuilder) FDv1CompatibleSynchronizer( - fallback ss.ComponentConfigurer[ss.DataSynchronizer]) *DataSystemConfigurationBuilder { + fallback ss.ComponentConfigurer[ss.DataSynchronizer], +) *DataSystemConfigurationBuilder { d.fdv1FallbackBuilder = fallback return d } @@ -181,10 +182,7 @@ func (d *DataSystemConfigurationBuilder) Build( context ss.ClientContext, ) (ss.DataSystemConfiguration, error) { conf := d.config - if d.secondarySyncBuilder != nil && d.primarySyncBuilder == nil { - return ss.DataSystemConfiguration{}, errors.New("cannot have a secondary synchronizer without " + - "a primary synchronizer") - } + if d.storeBuilder != nil { store, err := d.storeBuilder.Build(context) if err != nil { @@ -204,15 +202,22 @@ func (d *DataSystemConfigurationBuilder) Build( } conf.Initializers = append(conf.Initializers, initializer) } - if d.primarySyncBuilder != nil { - conf.Synchronizers.PrimaryBuilder = func() (ss.DataSynchronizer, error) { - return d.primarySyncBuilder.Build(context) - } - } - if d.secondarySyncBuilder != nil { - conf.Synchronizers.SecondaryBuilder = func() (ss.DataSynchronizer, error) { - return d.secondarySyncBuilder.Build(context) + + // Build synchronizer list + for i, builder := range d.synchronizerBuilders { + if builder == nil { + return ss.DataSystemConfiguration{}, + fmt.Errorf("synchronizer %d is nil", i) } + + // Capture builder in closure to avoid loop variable issues + b := builder + conf.Synchronizers.SynchronizerBuilders = append( + conf.Synchronizers.SynchronizerBuilders, + func() (ss.DataSynchronizer, error) { + return b.Build(context) + }, + ) } if d.fdv1FallbackBuilder != nil { conf.Synchronizers.FDv1FallbackBuilder = func() (ss.DataSynchronizer, error) { diff --git a/subsystems/datasystem_configuration.go b/subsystems/datasystem_configuration.go index 8822183c..473b3937 100644 --- a/subsystems/datasystem_configuration.go +++ b/subsystems/datasystem_configuration.go @@ -1,13 +1,15 @@ package subsystems -// SynchronizersConfiguration represents the config for the primary and secondary synchronizers. +// SynchronizersConfiguration represents the config for synchronizers. type SynchronizersConfiguration struct { - // The builder for the synchronizer that is primarily active. - PrimaryBuilder func() (DataSynchronizer, error) - // A fallback builder for the synchronizer if the primary fails. - SecondaryBuilder func() (DataSynchronizer, error) - // A temporarily supported FDv1 fallback builder for the synchronizer as an - // alternative fallback option. + // SynchronizerBuilders is an ordered list of synchronizer builders. + // The system starts at index 0 and moves down the list on fallback or removal. + // On recovery (when not at index 0), the system jumps back to index 0. + SynchronizerBuilders []func() (DataSynchronizer, error) + + // FDv1FallbackBuilder is a special fallback used only when a synchronizer + // returns RevertToFDv1=true. When activated, the system abandons the synchronizer list + // and switches to FDv1-only mode. FDv1FallbackBuilder func() (DataSynchronizer, error) } diff --git a/testservice/sdk_client_entity.go b/testservice/sdk_client_entity.go index f3c83a9e..95416968 100644 --- a/testservice/sdk_client_entity.go +++ b/testservice/sdk_client_entity.go @@ -395,53 +395,37 @@ func makeSDKConfig(config servicedef.SDKConfigParams, sdkLog ldlog.Loggers) (ld. dataSystemBuilder.Initializers(initializers...) } - if config.DataSystem.Synchronizers != nil { + if config.DataSystem.Synchronizers != nil && len(*config.DataSystem.Synchronizers) > 0 { var fdv1Fallback *ldcomponents.FDv1PollingDataSourceBuilderV2 - primary, err := makeSynchronizerConfig(config.DataSystem.Synchronizers.Primary, config, &ret) - if err != nil { - return ret, err - } - - if config.DataSystem.Synchronizers.Primary.Polling != nil { - fdv1Fallback = ldcomponents.FDv1PollingDataSourceV2() - if config.DataSystem.Synchronizers.Primary.Polling.PollIntervalMS != nil { - fdv1Fallback.PollInterval(time.Millisecond * time.Duration(*config.DataSystem.Synchronizers.Primary.Polling.PollIntervalMS)) - } - if config.DataSystem.Synchronizers.Primary.Polling.BaseURI != "" { - fdv1Fallback.BaseURI(config.DataSystem.Synchronizers.Primary.Polling.BaseURI) - } - if config.DataSystem.PayloadFilter != nil { - fdv1Fallback.PayloadFilter(*config.DataSystem.PayloadFilter) - } - } + synchronizers := make([]subsystems.ComponentConfigurer[subsystems.DataSynchronizer], 0, len(*config.DataSystem.Synchronizers)) - var secondary subsystems.ComponentConfigurer[subsystems.DataSynchronizer] - if config.DataSystem.Synchronizers.Secondary != nil { - secondary, err = makeSynchronizerConfig(*config.DataSystem.Synchronizers.Secondary, config, &ret) + for _, syncConfig := range *config.DataSystem.Synchronizers { + builder, err := makeSynchronizerConfig(syncConfig, config, &ret) if err != nil { return ret, err } + synchronizers = append(synchronizers, builder) - if fdv1Fallback == nil && config.DataSystem.Synchronizers.Secondary.Polling != nil { + // Set up FDv1 fallback from the last polling synchronizer we find (matches Python behavior) + if syncConfig.Polling != nil { fdv1Fallback = ldcomponents.FDv1PollingDataSourceV2() - if config.DataSystem.Synchronizers.Secondary.Polling.PollIntervalMS != nil { - fdv1Fallback.PollInterval(time.Millisecond * time.Duration(*config.DataSystem.Synchronizers.Secondary.Polling.PollIntervalMS)) + if syncConfig.Polling.PollIntervalMS != nil { + fdv1Fallback.PollInterval(time.Millisecond * time.Duration(*syncConfig.Polling.PollIntervalMS)) } - if config.DataSystem.Synchronizers.Secondary.Polling.BaseURI != "" { - fdv1Fallback.BaseURI(config.DataSystem.Synchronizers.Secondary.Polling.BaseURI) + if syncConfig.Polling.BaseURI != "" { + fdv1Fallback.BaseURI(syncConfig.Polling.BaseURI) } if config.DataSystem.PayloadFilter != nil { fdv1Fallback.PayloadFilter(*config.DataSystem.PayloadFilter) } } - } if fdv1Fallback != nil { dataSystemBuilder.FDv1CompatibleSynchronizer(fdv1Fallback) } - dataSystemBuilder.Synchronizers(primary, secondary) + dataSystemBuilder.Synchronizers(synchronizers...) } if config.DataSystem.Store != nil && config.DataSystem.Store.PersistentDataStore != nil { diff --git a/testservice/servicedef/sdk_config.go b/testservice/servicedef/sdk_config.go index 19f8ee09..97a21fbb 100644 --- a/testservice/servicedef/sdk_config.go +++ b/testservice/servicedef/sdk_config.go @@ -53,10 +53,7 @@ type DataInitializer struct { Polling *SDKConfigPollingParams `json:"polling,omitempty"` } -type Synchronizers struct { - Primary Synchronizer `json:"primary"` - Secondary *Synchronizer `json:"secondary,omitempty"` -} +type Synchronizers []Synchronizer type Synchronizer struct { Streaming *SDKConfigStreamingParams `json:"streaming,omitempty"`