Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
Expand All @@ -40,6 +41,7 @@ import (

const defaultFallbackFileMaxAge = 14 * 24 * time.Hour // 14 days
const defaultLivenessPingIntervalSeconds = 60 * 5 * time.Second
const defaultMaxRetrySleep = 60 * time.Second

var secretsToInclude []string

Expand Down Expand Up @@ -227,8 +229,9 @@ doppler run --mount secrets.json -- cat secrets.json`,
var processMutex sync.Mutex
// used to ensure we only process one event at a time
var watchMutex sync.Mutex
// this variable has the potential to be racey, but is made safe by our use of the mutex
// these variables have the potential to be racey, but are made safe by our use of the mutex
terminatedByWatch := false
watchedValuesMayBeStale := false

startLivenessPing := func() {
ticker := time.NewTicker(defaultLivenessPingIntervalSeconds)
Expand All @@ -246,22 +249,29 @@ doppler run --mount secrets.json -- cat secrets.json`,
}
}
}()

}

startProcess := func() {
// ensure we can fetch the new secrets before restarting the process
secrets := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, secretsToInclude)
secrets, fromCache := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, secretsToInclude)
secretsFetchedAt := time.Now()
if secretsFetchedAt.After(lastSecretsFetch) {
lastSecretsFetch = secretsFetchedAt
}
if !fromCache {
watchedValuesMayBeStale = false
}

controllers.ValidateSecrets(secrets, secretsToInclude, exitOnMissingIncludedSecrets, mountOptions)

isRestart := c != nil
// terminate the old process
if isRestart {
// if the watched values might be stale and we didn't read a brand new copy from the network, we shouldn't restart the process
if fromCache && watchedValuesMayBeStale {
return
}

terminatedByWatch = true

// killing the process here will cause the cleanup goroutine below to run, thereby unlocking the mutex
Expand Down Expand Up @@ -351,12 +361,16 @@ doppler run --mount secrets.json -- cat secrets.json`,
}()
}

watchRetrySleep := 1 * time.Second
watchHandler := func(data []byte) {
event := controllers.ParseWatchEvent(data)
if event.Type == "" {
return
}

// when we've received a successful event, we know we're connected, and we can reset the retry sleep time
watchRetrySleep = 1 * time.Second

// don't capture analytics for the ping event; it's too noisy
if event.Type != "ping" {
controllers.CaptureEvent("WatchDataReceived", map[string]interface{}{"event": event.Type})
Expand All @@ -383,6 +397,14 @@ doppler run --mount secrets.json -- cat secrets.json`,
startProcess()
} else if event.Type == "connected" {
utils.LogDebug("Connected to secrets stream")

// if we're recovering the connection after a network failure, it's possible that we missed a secrets.update event.
// we'll call startProcess() to check the latest values against our cache and restart as necessary
if watchedValuesMayBeStale {
watchMutex.Lock()
defer watchMutex.Unlock()
startProcess()
}
} else if event.Type == "ping" {
// do nothing
} else {
Expand All @@ -398,43 +420,52 @@ doppler run --mount secrets.json -- cat secrets.json`,

// initiate watch logic after starting the process so that failing to watch just degrades to normal 'run' behavior
if watch {
maxAttempts := 10
attempt := 0
_ = utils.Retry(maxAttempts, time.Second, func() error {
attempt = attempt + 1
var watchConnectionHandler func()

watchConnectionHandler = func() {
statusCode, headers, httpErr := http.WatchSecrets(localConfig.APIHost.Value, utils.GetBool(localConfig.VerifyTLS.Value, true), localConfig.Token.Value, localConfig.EnclaveProject.Value, localConfig.EnclaveConfig.Value, watchHandler)

if !httpErr.IsNil() {
e := httpErr.Unwrap()
watchRetrySleep = 2 * watchRetrySleep

utils.LogDebug(fmt.Sprintf("Status Code %v", statusCode))

msg := "Unable to watch for secrets changes"
// a 200 is sent as soon as the connection is established, so if it dies after that the status code
// will still be 200. we should retry these requests
canRetry := http.IsRetry(statusCode, headers.Get("content-type")) || statusCode == 200
if canRetry && attempt < maxAttempts {
// if connection could not be established to the server (statusCode 0), we'll also retry
canRetry := http.IsRetry(statusCode, headers.Get("content-type")) || statusCode == 200 || statusCode == 0
if canRetry {
msg += ". Will retry"
}
if statusCode == 200 {
// this connection was likely killed due to a timeout, so we can log quietly
if statusCode == 200 || statusCode == 0 {
// this connection was likely killed due to a timeout / lost connectivity, so we can log quietly
utils.LogDebugError(errors.New(msg))
} else {
utils.LogError(errors.New(msg))
}

controllers.CaptureEvent("WatchConnectionError", map[string]interface{}{"statusCode": statusCode, "canRetry": canRetry})
watchedValuesMayBeStale = true

if statusCode != 0 {
e = fmt.Errorf("%s. Status code: %d", e, statusCode)
}
utils.LogDebugError(e)

if !canRetry {
return utils.StopRetryError(e)
if canRetry {
jitter := time.Duration(rand.Int63n(int64(watchRetrySleep))) // #nosec G404
sleep := utils.Min(watchRetrySleep, defaultMaxRetrySleep) + jitter/2
utils.LogDebug(fmt.Sprintf("restarting after %v", sleep))
time.Sleep(sleep)

watchConnectionHandler()
}
}
}

return httpErr.Unwrap()
})
watchConnectionHandler()
}
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func downloadSecrets(cmd *cobra.Command, args []string) {
ExitOnWriteFailure: exitOnWriteFailure,
Passphrase: fallbackPassphrase,
}
secrets := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, nil)
secrets, _ := controllers.FetchSecrets(localConfig, enableCache, fallbackOpts, metadataPath, nameTransformer, dynamicSecretsTTL, format, nil)

var err error
body, err = json.Marshal(secrets)
Expand Down
15 changes: 8 additions & 7 deletions pkg/controllers/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,14 @@ func PrepareSecrets(dopplerSecrets map[string]string, originalEnv []string, pres
return env, onExit
}

// fetchSecrets from Doppler and handle fallback file
func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOpts FallbackOptions, metadataPath string, nameTransformer *models.SecretsNameTransformer, dynamicSecretsTTL time.Duration, format models.SecretsFormat, secretNames []string) map[string]string {
// FetchSecrets from Doppler and handle fallback file.
// It returns a tuple of the secrets and a boolean of whether the result was from a cache/fallback file
func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOpts FallbackOptions, metadataPath string, nameTransformer *models.SecretsNameTransformer, dynamicSecretsTTL time.Duration, format models.SecretsFormat, secretNames []string) (map[string]string, bool) {
if fallbackOpts.Exclusive {
if !fallbackOpts.Enable {
utils.HandleError(errors.New("Conflict: unable to specify --no-fallback with " + fallbackOpts.ExclusiveFlag))
}
return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false)
return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false), true
}

// this scenario likely isn't possible, but just to be safe, disable using cache when there's no metadata file
Expand All @@ -457,7 +458,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp
if fallbackOpts.Enable && canUseFallback {
utils.Log("Unable to fetch secrets from the Doppler API")
utils.LogError(httpErr.Unwrap())
return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false)
return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false), true
}
utils.HandleError(httpErr.Unwrap(), httpErr.Message)
}
Expand All @@ -473,7 +474,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp
utils.HandleError(err.Unwrap(), err.Message)
}

return cache
return cache, true
}

// ensure the response can be parsed before proceeding
Expand All @@ -484,7 +485,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp
if fallbackOpts.Enable {
utils.Log("Unable to parse the Doppler API response")
utils.LogError(httpErr.Unwrap())
return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false)
return readFallbackFile(fallbackOpts.Path, fallbackOpts.LegacyPath, fallbackOpts.Passphrase, false), true
}
utils.HandleError(err, "Unable to parse API response")
}
Expand Down Expand Up @@ -521,7 +522,7 @@ func FetchSecrets(localConfig models.ScopedOptions, enableCache bool, fallbackOp
}
}

return secrets
return secrets, false
}

func Run(cmd *cobra.Command, args []string, env []string, forwardSignals bool) (*exec.Cmd, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func DeleteRequest(url *url.URL, verifyTLS bool, headers map[string]string, body
return statusCode, respHeaders, body, nil
}

func request(req *http.Request, verifyTLS bool, allowTimeout bool) (*http.Response, error) {
func request(req *http.Request, verifyTLS bool, allowTimeout bool, allowRetry bool) (*http.Response, error) {
// set headers
req.Header.Set("client-sdk", "go-cli")
req.Header.Set("client-version", version.ProgramVersion)
Expand Down Expand Up @@ -236,7 +236,7 @@ func request(req *http.Request, verifyTLS bool, allowTimeout bool) (*http.Respon

utils.LogDebug(err.Error())

if isTimeout(err) || errors.Is(err, syscall.ECONNREFUSED) {
if allowRetry && (isTimeout(err) || errors.Is(err, syscall.ECONNREFUSED)) {
// retry request
return err
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func request(req *http.Request, verifyTLS bool, allowTimeout bool) (*http.Respon

func performSSERequest(req *http.Request, verifyTLS bool, handler func([]byte)) (int, http.Header, error) {
// nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable
response, requestErr := request(req, verifyTLS, false)
response, requestErr := request(req, verifyTLS, false, false)
if requestErr != nil {
statusCode := 0
if response != nil {
Expand Down Expand Up @@ -312,7 +312,7 @@ func performSSERequest(req *http.Request, verifyTLS bool, handler func([]byte))
}

func performRequest(req *http.Request, verifyTLS bool) (int, http.Header, []byte, error) {
response, requestErr := request(req, verifyTLS, true)
response, requestErr := request(req, verifyTLS, true, true)
if response != nil {
defer func() {
if closeErr := response.Body.Close(); closeErr != nil {
Expand Down
10 changes: 7 additions & 3 deletions pkg/utils/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@ limitations under the License.
*/
package utils

func Min(x, y int) int {
import (
"golang.org/x/exp/constraints"
)

func Min[T constraints.Ordered](x, y T) T {
if x < y {
return x
}
return y
}

func Max(x, y int) int {
func Max[T constraints.Ordered](x, y T) T {
if x > y {
return x
}
return y
}

func Clamp(x int, min int, max int) int {
func Clamp[T constraints.Ordered](x, min, max T) T {
if x < min {
return min
} else if x > max {
Expand Down
Loading