Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/common_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
enable_persistence_tests: 'true'
test_service_port: ${{ env.TEST_SERVICE_PORT }}
token: ${{ secrets.GITHUB_TOKEN }}
version: v3.0.0-alpha.1
version: v3.0.0-alpha.3

- name: Upload test service logs
uses: actions/upload-artifact@v4
Expand Down
180 changes: 98 additions & 82 deletions internal/datasystem/fdv2_datasystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ type FDv2 struct {
// List of initializers that are capable of obtaining an initial payload of data.
initializers []subsystems.DataInitializer

// The primary synchronizer responsible for keeping data up-to-date.
primarySyncBuilder func() (subsystems.DataSynchronizer, error)
// Mutable list of synchronizer builders. Items are removed when they permanently fail.
// When reverting to FDv1, this list is replaced with a single FDv1 synchronizer.
synchronizerBuilders []func() (subsystems.DataSynchronizer, error)
currentSyncIndex int

// FDv1 fallback builder, used only when a synchronizer requests revert to FDv1
fdv1FallbackBuilder func() (subsystems.DataSynchronizer, error)

// Boolean used to track whether the datasystem was originally configured
// with some sort of valid data source.
Expand All @@ -53,12 +58,6 @@ type FDv2 struct {
// they permanently fail.
configuredWithDataSources bool

// The secondary synchronizer, in case the primary is unavailable.
secondarySyncBuilder func() (subsystems.DataSynchronizer, error)

// The fdv1 fallback synchronizer, in case we have to fall back to fdv1.
fdv1SyncBuilder func() (subsystems.DataSynchronizer, error)

// Whether the SDK should make use of persistent store/initializers/synchronizers or not.
disabled bool

Expand Down Expand Up @@ -139,10 +138,11 @@ func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems
}

fdv2.initializers = cfg.Initializers
fdv2.primarySyncBuilder = cfg.Synchronizers.PrimaryBuilder
fdv2.secondarySyncBuilder = cfg.Synchronizers.SecondaryBuilder
fdv2.fdv1SyncBuilder = cfg.Synchronizers.FDv1FallbackBuilder
fdv2.synchronizerBuilders = cfg.Synchronizers.SynchronizerBuilders
fdv2.currentSyncIndex = 0
fdv2.fdv1FallbackBuilder = cfg.Synchronizers.FDv1FallbackBuilder
fdv2.disabled = disabled

fdv2.fallbackCond = func(status interfaces.DataSourceStatus) bool {
interruptedAtRuntime := status.State == interfaces.DataSourceStateInterrupted &&
time.Since(status.StateSince) > 1*time.Minute
Expand All @@ -162,7 +162,7 @@ func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems
return interruptedAtRuntime || healthyForTooLong || cannotInitialize
}

fdv2.configuredWithDataSources = len(fdv2.initializers) > 0 || fdv2.primarySyncBuilder != nil
fdv2.configuredWithDataSources = len(fdv2.initializers) > 0 || len(fdv2.synchronizerBuilders) > 0

if cfg.Store != nil && !disabled {
// If there's a persistent Store, we should provide a status monitor and inform Store that it's present.
Expand Down Expand Up @@ -263,9 +263,8 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}
}

func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) {
// If the SDK was configured with no synchronizer, then (assuming no initializer succeeded, which would have
// already closed the channel), we should close it now so that MakeClient unblocks.
if f.primarySyncBuilder == nil {
// If no synchronizers configured, close ready channel and return
if len(f.synchronizerBuilders) == 0 {
f.readyOnce.Do(func() {
close(closeWhenReady)
})
Expand All @@ -279,82 +278,73 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{
})

for {
primarySync, err := f.primarySyncBuilder()
if err != nil {
f.loggers.Errorf("Failed to build the primary synchronizer: %v", err)
return
}

f.loggers.Debugf("Primary synchronizer %s is starting", primarySync.Name())
resultChan := primarySync.Sync(f.store)
removeSync, fallbackv1, err := f.consumeSynchronizerResults(ctx, resultChan, f.fallbackCond, closeWhenReady)

if err := primarySync.Close(); err != nil {
f.loggers.Errorf("Primary synchronizer %s failed to gracefully close: %v", primarySync.Name(), err)
}
if errors.Is(err, context.Canceled) {
// Check if we've run out of synchronizers
if len(f.synchronizerBuilders) == 0 {
f.loggers.Warn("No more synchronizers available")
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
return
}

if removeSync {
f.primarySyncBuilder = f.secondarySyncBuilder
f.secondarySyncBuilder = nil

if fallbackv1 {
f.primarySyncBuilder = f.fdv1SyncBuilder
}

if f.primarySyncBuilder == nil {
f.loggers.Debugf("No more synchronizers available, closing the channel")
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
f.readyOnce.Do(func() {
close(closeWhenReady)
})
return
}
} else {
f.loggers.Debugf("Fallback condition met")
}

if f.secondarySyncBuilder == nil {
continue
// Ensure currentSyncIndex is within bounds (shouldn't happen with proper logic)
if f.currentSyncIndex >= len(f.synchronizerBuilders) {
f.currentSyncIndex = 0
}

secondarySync, err := f.secondarySyncBuilder()
// Build synchronizer
sync, err := f.synchronizerBuilders[f.currentSyncIndex]()
if err != nil {
f.loggers.Errorf("Failed to build the secondary synchronizer: %v", err)
return
f.loggers.Errorf("Failed to build synchronizer at index %d: %v", f.currentSyncIndex, err)
// Remove the failed builder from the list
f.synchronizerBuilders = append(
f.synchronizerBuilders[:f.currentSyncIndex],
f.synchronizerBuilders[f.currentSyncIndex+1:]...)
// Don't increment currentSyncIndex - it now points to the next synchronizer
continue
}
f.loggers.Debugf("Secondary synchronizer %s is starting", secondarySync.Name())

resultChan = secondarySync.Sync(f.store)
removeSync, fallbackv1, err = f.consumeSynchronizerResults(ctx, resultChan, f.recoveryCond, closeWhenReady)
f.loggers.Infof("Synchronizer at index %d (%s) is starting", f.currentSyncIndex, sync.Name())
resultChan := sync.Sync(f.store)
action, err := f.consumeSynchronizerResults(ctx, resultChan, closeWhenReady)

if err := secondarySync.Close(); err != nil {
f.loggers.Errorf("Secondary synchronizer %s failed to gracefully close: %v", secondarySync.Name(), err)
if err := sync.Close(); err != nil {
f.loggers.Errorf("Synchronizer %s failed to close: %v", sync.Name(), err)
}

if errors.Is(err, context.Canceled) {
return
}

if removeSync {
f.secondarySyncBuilder = nil

if fallbackv1 {
f.primarySyncBuilder = f.fdv1SyncBuilder

if f.primarySyncBuilder == nil {
f.loggers.Debugf("No more synchronizers available, closing the channel")
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
f.readyOnce.Do(func() {
close(closeWhenReady)
})
return
}
// Handle action based on conditions
switch action {
case syncFDv1:
if f.fdv1FallbackBuilder != nil {
f.loggers.Warn("Reverting to FDv1 protocol")
// Replace entire list with single FDv1 synchronizer
f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder}
f.currentSyncIndex = 0
continue
}
f.loggers.Warn("Synchronizer requested FDv1 fallback but none configured")
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
return
case syncRemove:
f.loggers.Warnf("Permanently removing synchronizer at index %d", f.currentSyncIndex)
f.synchronizerBuilders = append(
f.synchronizerBuilders[:f.currentSyncIndex],
f.synchronizerBuilders[f.currentSyncIndex+1:]...)
// Don't increment currentSyncIndex - it now points to the next synchronizer
continue
case syncRecover:
// Recovery: jump back to index 0
f.loggers.Info("Recovery condition met, returning to first synchronizer")
f.currentSyncIndex = 0
case syncFallback:
// Fallback: move to next index
f.loggers.Info("Fallback condition met, trying next synchronizer")
f.currentSyncIndex++
}

f.loggers.Debugf("Recovery condition met")
// Check for cancellation before next iteration
select {
case <-ctx.Done():
return
Expand All @@ -364,25 +354,33 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{
})
}

type syncAction int

const (
syncFallback syncAction = iota
syncRecover
syncRemove
syncFDv1
)

func (f *FDv2) consumeSynchronizerResults(
ctx context.Context,
resultChan <-chan subsystems.DataSynchronizerResult,
cond func(status interfaces.DataSourceStatus) bool,
closeWhenReady chan<- struct{},
) (bool, bool, error) {
) (action syncAction, err error) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return false, false, ctx.Err()
return syncFallback, ctx.Err()
case result, ok := <-resultChan:
// The status channel being closed means that we won't be receiving
// any more information from that synchronizer and we should
// probably fall back.
if !ok {
return false, false, nil
return syncFallback, nil
}

if result.EnvironmentID.IsDefined() {
Expand All @@ -404,15 +402,33 @@ func (f *FDv2) consumeSynchronizerResults(
f.UpdateStatus(result.State, result.Error)
case interfaces.DataSourceStateOff:
f.UpdateStatus(interfaces.DataSourceStateInterrupted, result.Error)
return true, result.RevertToFDv1, nil
if result.RevertToFDv1 {
return syncFDv1, nil
}
return syncRemove, nil
}
case <-ticker.C:
// If there's only one synchronizer, don't check conditions
if len(f.synchronizerBuilders) == 1 {
continue
}

status := f.getStatus()
f.loggers.Debugf("Data source status used to evaluate condition: %s", status.String())
if cond(status) {
return false, false, nil

// Check fallback condition first (things are bad)
if f.fallbackCond(status) {
f.loggers.Debugf("Fallback condition met")
return syncFallback, nil
}

// If not at index 0, also check recovery condition (things are good)
if f.currentSyncIndex > 0 && f.recoveryCond(status) {
f.loggers.Debugf("Recovery condition met")
return syncRecover, nil
}
f.loggers.Debugf("Condition check succeeded, continue with current synchronizer")

f.loggers.Debugf("No condition met, continue with current synchronizer")
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions ldclient_end_to_end_fdv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ func TestFDV2ShutdownDownIfBothSynchronizersFail(t *testing.T) {
expectedStreamError := "Error in stream connection (giving up permanently): HTTP error 401 (invalid SDK key)"
expectedPollError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)"
assert.Equal(t, []string{expectedStreamError, expectedPollError}, logCapture.GetOutput(ldlog.Error))
assert.Equal(t, []string{initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn))
assert.Equal(t, []string{
"Permanently removing synchronizer at index 0",
"Permanently removing synchronizer at index 0",
"No more synchronizers available",
initializationFailedErrorMessage,
}, logCapture.GetOutput(ldlog.Warn))
})
}

Expand Down Expand Up @@ -280,7 +285,11 @@ func TestFDV2PollingSynchronizerFailsToStartWith401Error(t *testing.T) {

expectedError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)"
assert.Equal(t, []string{expectedError}, logCapture.GetOutput(ldlog.Error))
assert.Equal(t, []string{initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn))
assert.Equal(t, []string{
"Permanently removing synchronizer at index 0",
"No more synchronizers available",
initializationFailedErrorMessage,
}, logCapture.GetOutput(ldlog.Warn))
})
}

Expand Down Expand Up @@ -383,7 +392,6 @@ func TestFDV2FileInitializerWillDeferToFirstSynchronizer(t *testing.T) {
).
Synchronizers(
ldcomponents.StreamingDataSourceV2().BaseURI(server.URL),
nil,
),
}

Expand Down
Loading