diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index bb2760c9..fa1531a8 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io/ioutil" + "math/rand" "os" "os/exec" "path/filepath" @@ -40,6 +41,7 @@ import ( const defaultFallbackFileMaxAge = 14 * 24 * time.Hour // 14 days const defaultLivenessPingIntervalSeconds = 60 * 5 * time.Second +const defaultMaxRetrySleep = 60 * time.Second var secretsToInclude []string @@ -227,8 +229,9 @@ doppler run --mount secrets.json -- cat secrets.json`, var processMutex sync.Mutex // used to ensure we only process one event at a time var watchMutex sync.Mutex - // this variable has the potential to be racey, but is made safe by our use of the mutex + // these variables have the potential to be racey, but are made safe by our use of the mutex terminatedByWatch := false + watchedValuesMayBeStale := false startLivenessPing := func() { ticker := time.NewTicker(defaultLivenessPingIntervalSeconds) @@ -246,22 +249,29 @@ doppler run --mount secrets.json -- cat secrets.json`, } } }() - } startProcess := func() { // ensure we can fetch the new secrets before restarting the process - secrets := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, secretsToInclude) + secrets, fromCache := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, secretsToInclude) secretsFetchedAt := time.Now() if secretsFetchedAt.After(lastSecretsFetch) { lastSecretsFetch = secretsFetchedAt } + if !fromCache { + watchedValuesMayBeStale = false + } controllers.ValidateSecrets(secrets, secretsToInclude, exitOnMissingIncludedSecrets, mountOptions) isRestart := c != nil // terminate the old process if isRestart { + // if the watched values might be stale and we didn't read a brand new copy from the network, we shouldn't restart the process + if fromCache && watchedValuesMayBeStale { + return + } + terminatedByWatch = true // killing the process here will cause the cleanup goroutine below to run, thereby unlocking the mutex @@ -351,12 +361,16 @@ doppler run --mount secrets.json -- cat secrets.json`, }() } + watchRetrySleep := 1 * time.Second watchHandler := func(data []byte) { event := controllers.ParseWatchEvent(data) if event.Type == "" { return } + // when we've received a successful event, we know we're connected, and we can reset the retry sleep time + watchRetrySleep = 1 * time.Second + // don't capture analytics for the ping event; it's too noisy if event.Type != "ping" { controllers.CaptureEvent("WatchDataReceived", map[string]interface{}{"event": event.Type}) @@ -383,6 +397,14 @@ doppler run --mount secrets.json -- cat secrets.json`, startProcess() } else if event.Type == "connected" { utils.LogDebug("Connected to secrets stream") + + // if we're recovering the connection after a network failure, it's possible that we missed a secrets.update event. + // we'll call startProcess() to check the latest values against our cache and restart as necessary + if watchedValuesMayBeStale { + watchMutex.Lock() + defer watchMutex.Unlock() + startProcess() + } } else if event.Type == "ping" { // do nothing } else { @@ -398,43 +420,52 @@ doppler run --mount secrets.json -- cat secrets.json`, // initiate watch logic after starting the process so that failing to watch just degrades to normal 'run' behavior if watch { - maxAttempts := 10 - attempt := 0 - _ = utils.Retry(maxAttempts, time.Second, func() error { - attempt = attempt + 1 + var watchConnectionHandler func() + watchConnectionHandler = func() { statusCode, headers, httpErr := http.WatchSecrets(localConfig.APIHost.Value, utils.GetBool(localConfig.VerifyTLS.Value, true), localConfig.Token.Value, localConfig.EnclaveProject.Value, localConfig.EnclaveConfig.Value, watchHandler) + if !httpErr.IsNil() { e := httpErr.Unwrap() + watchRetrySleep = 2 * watchRetrySleep + + utils.LogDebug(fmt.Sprintf("Status Code %v", statusCode)) msg := "Unable to watch for secrets changes" // a 200 is sent as soon as the connection is established, so if it dies after that the status code // will still be 200. we should retry these requests - canRetry := http.IsRetry(statusCode, headers.Get("content-type")) || statusCode == 200 - if canRetry && attempt < maxAttempts { + // if connection could not be established to the server (statusCode 0), we'll also retry + canRetry := http.IsRetry(statusCode, headers.Get("content-type")) || statusCode == 200 || statusCode == 0 + if canRetry { msg += ". Will retry" } - if statusCode == 200 { - // this connection was likely killed due to a timeout, so we can log quietly + if statusCode == 200 || statusCode == 0 { + // this connection was likely killed due to a timeout / lost connectivity, so we can log quietly utils.LogDebugError(errors.New(msg)) } else { utils.LogError(errors.New(msg)) } controllers.CaptureEvent("WatchConnectionError", map[string]interface{}{"statusCode": statusCode, "canRetry": canRetry}) + watchedValuesMayBeStale = true if statusCode != 0 { e = fmt.Errorf("%s. Status code: %d", e, statusCode) } utils.LogDebugError(e) - if !canRetry { - return utils.StopRetryError(e) + if canRetry { + jitter := time.Duration(rand.Int63n(int64(watchRetrySleep))) // #nosec G404 + sleep := utils.Min(watchRetrySleep, defaultMaxRetrySleep) + jitter/2 + utils.LogDebug(fmt.Sprintf("restarting after %v", sleep)) + time.Sleep(sleep) + + watchConnectionHandler() } } + } - return httpErr.Unwrap() - }) + watchConnectionHandler() } }, } diff --git a/pkg/cmd/secrets.go b/pkg/cmd/secrets.go index 1ab9b05e..869412ab 100644 --- a/pkg/cmd/secrets.go +++ b/pkg/cmd/secrets.go @@ -530,7 +530,7 @@ func downloadSecrets(cmd *cobra.Command, args []string) { ExitOnWriteFailure: exitOnWriteFailure, Passphrase: fallbackPassphrase, } - secrets := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, nil) + secrets, _ := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, nil) var err error body, err = json.Marshal(secrets) diff --git a/pkg/controllers/secrets.go b/pkg/controllers/secrets.go index 64982176..2dd0428a 100644 --- a/pkg/controllers/secrets.go +++ b/pkg/controllers/secrets.go @@ -426,13 +426,14 @@ func PrepareSecrets(dopplerSecrets map[string]string, originalEnv []string, pres return env, onExit } -// fetchSecrets from Doppler and handle fallback file -func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOpts FallbackOptions, metadataPath string, nameTransformer *models.SecretsNameTransformer, dynamicSecretsTTL time.Duration, format models.SecretsFormat, secretNames []string) map[string]string { +// FetchSecrets from Doppler and handle fallback file. +// It returns a tuple of the secrets and a boolean of whether the result was from a cache/fallback file +func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOpts FallbackOptions, metadataPath string, nameTransformer *models.SecretsNameTransformer, dynamicSecretsTTL time.Duration, format models.SecretsFormat, secretNames []string) (map[string]string, bool) { if fallbackOpts.Exclusive { if !fallbackOpts.Enable { utils.HandleError(errors.New("Conflict: unable to specify --no-fallback with " + fallbackOpts.ExclusiveFlag)) } - return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false) + return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false), true } // this scenario likely isn't possible, but just to be safe, disable using cache when there's no metadata file @@ -457,7 +458,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp if fallbackOpts.Enable && canUseFallback { utils.Log("Unable to fetch secrets from the Doppler API") utils.LogError(httpErr.Unwrap()) - return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false) + return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false), true } utils.HandleError(httpErr.Unwrap(), httpErr.Message) } @@ -473,7 +474,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp utils.HandleError(err.Unwrap(), err.Message) } - return cache + return cache, true } // ensure the response can be parsed before proceeding @@ -484,7 +485,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp if fallbackOpts.Enable { utils.Log("Unable to parse the Doppler API response") utils.LogError(httpErr.Unwrap()) - return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false) + return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false), true } utils.HandleError(err, "Unable to parse API response") } @@ -521,7 +522,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp } } - return secrets + return secrets, false } func Run(cmd *cobra.Command, args []string, env []string, forwardSignals bool) (*exec.Cmd, error) { diff --git a/pkg/http/http.go b/pkg/http/http.go index 5337dcb9..085d098f 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -146,7 +146,7 @@ func DeleteRequest(url *url.URL, verifyTLS bool, headers map[string]string, body return statusCode, respHeaders, body, nil } -func request(req *http.Request, verifyTLS bool, allowTimeout bool) (*http.Response, error) { +func request(req *http.Request, verifyTLS bool, allowTimeout bool, allowRetry bool) (*http.Response, error) { // set headers req.Header.Set("client-sdk", "go-cli") req.Header.Set("client-version", version.ProgramVersion) @@ -236,7 +236,7 @@ func request(req *http.Request, verifyTLS bool, allowTimeout bool) (*http.Respon utils.LogDebug(err.Error()) - if isTimeout(err) || errors.Is(err, syscall.ECONNREFUSED) { + if allowRetry && (isTimeout(err) || errors.Is(err, syscall.ECONNREFUSED)) { // retry request return err } @@ -273,7 +273,7 @@ func request(req *http.Request, verifyTLS bool, allowTimeout bool) (*http.Respon func performSSERequest(req *http.Request, verifyTLS bool, handler func([]byte)) (int, http.Header, error) { // nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable - response, requestErr := request(req, verifyTLS, false) + response, requestErr := request(req, verifyTLS, false, false) if requestErr != nil { statusCode := 0 if response != nil { @@ -312,7 +312,7 @@ func performSSERequest(req *http.Request, verifyTLS bool, handler func([]byte)) } func performRequest(req *http.Request, verifyTLS bool) (int, http.Header, []byte, error) { - response, requestErr := request(req, verifyTLS, true) + response, requestErr := request(req, verifyTLS, true, true) if response != nil { defer func() { if closeErr := response.Body.Close(); closeErr != nil { diff --git a/pkg/utils/number.go b/pkg/utils/number.go index 89185d3f..fa606845 100644 --- a/pkg/utils/number.go +++ b/pkg/utils/number.go @@ -15,21 +15,25 @@ limitations under the License. */ package utils -func Min(x, y int) int { +import ( + "golang.org/x/exp/constraints" +) + +func Min[T constraints.Ordered](x, y T) T { if x < y { return x } return y } -func Max(x, y int) int { +func Max[T constraints.Ordered](x, y T) T { if x > y { return x } return y } -func Clamp(x int, min int, max int) int { +func Clamp[T constraints.Ordered](x, min, max T) T { if x < min { return min } else if x > max {