From 8d5bf91aab5e7af4663f0e0ef4d97a97dcf19f52 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 00:30:10 +0200 Subject: [PATCH 01/10] Added `process.Background()` and `process.Forwarded()` This PR adds higher-level wrappers for calling subprocesses --- bundle/config/artifact.go | 10 +++---- bundle/scripts/scripts.go | 1 + libs/git/clone.go | 17 +++-------- libs/process/background.go | 32 +++++++++++++++++++++ libs/process/background_test.go | 31 ++++++++++++++++++++ libs/process/forwarded.go | 48 +++++++++++++++++++++++++++++++ libs/process/forwarded_test.go | 43 +++++++++++++++++++++++++++ libs/process/opts.go | 51 +++++++++++++++++++++++++++++++++ libs/process/opts_test.go | 18 ++++++++++++ python/runner.go | 6 ++-- python/runner_test.go | 2 +- 11 files changed, 237 insertions(+), 22 deletions(-) create mode 100644 libs/process/background.go create mode 100644 libs/process/background_test.go create mode 100644 libs/process/forwarded.go create mode 100644 libs/process/forwarded_test.go create mode 100644 libs/process/opts.go create mode 100644 libs/process/opts_test.go diff --git a/bundle/config/artifact.go b/bundle/config/artifact.go index d7048a02ec..44da150552 100644 --- a/bundle/config/artifact.go +++ b/bundle/config/artifact.go @@ -4,11 +4,11 @@ import ( "bytes" "context" "fmt" - "os/exec" "path" "strings" "github.com/databricks/cli/bundle/config/paths" + "github.com/databricks/cli/libs/process" "github.com/databricks/databricks-sdk-go/service/compute" ) @@ -56,13 +56,11 @@ func (a *Artifact) Build(ctx context.Context) ([]byte, error) { commands := strings.Split(a.BuildCommand, " && ") for _, command := range commands { buildParts := strings.Split(command, " ") - cmd := exec.CommandContext(ctx, buildParts[0], buildParts[1:]...) - cmd.Dir = a.Path - res, err := cmd.CombinedOutput() + res, err := process.Background(ctx, buildParts, process.WithDir(a.Path)) if err != nil { - return res, err + return nil, err } - out = append(out, res) + out = append(out, []byte(res)) } return bytes.Join(out, []byte{}), nil } diff --git a/bundle/scripts/scripts.go b/bundle/scripts/scripts.go index 1a8a471caa..90c1914fa9 100644 --- a/bundle/scripts/scripts.go +++ b/bundle/scripts/scripts.go @@ -61,6 +61,7 @@ func executeHook(ctx context.Context, b *bundle.Bundle, hook config.ScriptHook) return nil, nil, err } + // TODO: switch to process.Background(...) cmd := exec.CommandContext(ctx, interpreter, "-c", string(command)) cmd.Dir = b.Config.Path diff --git a/libs/git/clone.go b/libs/git/clone.go index af7ffa4bbf..5d3f97e96c 100644 --- a/libs/git/clone.go +++ b/libs/git/clone.go @@ -1,13 +1,14 @@ package git import ( - "bytes" "context" "errors" "fmt" "os/exec" "regexp" "strings" + + "github.com/databricks/cli/libs/process" ) // source: https://stackoverflow.com/questions/59081778/rules-for-special-characters-in-github-repository-name @@ -42,24 +43,14 @@ func (opts cloneOptions) args() []string { } func (opts cloneOptions) clone(ctx context.Context) error { - cmd := exec.CommandContext(ctx, "git", opts.args()...) - var cmdErr bytes.Buffer - cmd.Stderr = &cmdErr - - // start git clone - err := cmd.Start() + // start and wait for git clone to complete + _, err := process.Background(ctx, append([]string{"git"}, opts.args()...)) if errors.Is(err, exec.ErrNotFound) { return fmt.Errorf("please install git CLI to clone a repository: %w", err) } if err != nil { return fmt.Errorf("git clone failed: %w", err) } - - // wait for git clone to complete - err = cmd.Wait() - if err != nil { - return fmt.Errorf("git clone failed: %w. %s", err, cmdErr.String()) - } return nil } diff --git a/libs/process/background.go b/libs/process/background.go new file mode 100644 index 0000000000..2e3675c0b8 --- /dev/null +++ b/libs/process/background.go @@ -0,0 +1,32 @@ +package process + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "strings" + + "github.com/databricks/cli/libs/log" +) + +func Background(ctx context.Context, args []string, opts ...execOption) (string, error) { + commandStr := strings.Join(args, " ") + log.Debugf(ctx, "running: %s", commandStr) + cmd := exec.CommandContext(ctx, args[0], args[1:]...) + stdout := &bytes.Buffer{} + cmd.Stdin = os.Stdin + cmd.Stdout = stdout + cmd.Stderr = stdout + for _, o := range opts { + err := o(cmd) + if err != nil { + return "", err + } + } + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("%s: %s %w", commandStr, stdout.String(), err) + } + return strings.TrimSpace(stdout.String()), nil +} diff --git a/libs/process/background_test.go b/libs/process/background_test.go new file mode 100644 index 0000000000..c5cc003c49 --- /dev/null +++ b/libs/process/background_test.go @@ -0,0 +1,31 @@ +package process + +import ( + "context" + "fmt" + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBackground(t *testing.T) { + ctx := context.Background() + res, err := Background(ctx, []string{"echo", "1"}, WithDir("/")) + assert.NoError(t, err) + assert.Equal(t, "1", res) +} + +func TestBackgroundFails(t *testing.T) { + ctx := context.Background() + _, err := Background(ctx, []string{"ls", "/dev/null/x"}) + assert.NotNil(t, err) +} + +func TestBackgroundFailsOnOption(t *testing.T) { + ctx := context.Background() + _, err := Background(ctx, []string{"ls", "/dev/null/x"}, func(c *exec.Cmd) error { + return fmt.Errorf("nope") + }) + assert.EqualError(t, err, "nope") +} diff --git a/libs/process/forwarded.go b/libs/process/forwarded.go new file mode 100644 index 0000000000..19bf875ba9 --- /dev/null +++ b/libs/process/forwarded.go @@ -0,0 +1,48 @@ +package process + +import ( + "context" + "io" + "os/exec" + "strings" + + "github.com/databricks/cli/libs/log" +) + +func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, opts ...execOption) error { + commandStr := strings.Join(args, " ") + log.Debugf(ctx, "starting: %s", commandStr) + cmd := exec.CommandContext(ctx, args[0], args[1:]...) + + // make sure to sync on writing to stdout + reader, writer := io.Pipe() + go io.CopyBuffer(dst, reader, make([]byte, 128)) + defer reader.Close() + defer writer.Close() + cmd.Stdout = writer + cmd.Stderr = writer + + // apply common options + for _, o := range opts { + err := o(cmd) + if err != nil { + return err + } + } + + // pipe standard input to the child process, so that we can allow terminal UX + // see the PoC at https://github.com/databricks/cli/pull/637 + stdin, err := cmd.StdinPipe() + if err != nil { + return err + } + go io.CopyBuffer(stdin, src, make([]byte, 128)) + defer stdin.Close() + + err = cmd.Start() + if err != nil { + return err + } + + return cmd.Wait() +} diff --git a/libs/process/forwarded_test.go b/libs/process/forwarded_test.go new file mode 100644 index 0000000000..0c28923112 --- /dev/null +++ b/libs/process/forwarded_test.go @@ -0,0 +1,43 @@ +package process + +import ( + "bytes" + "context" + "os/exec" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestForwarded(t *testing.T) { + ctx := context.Background() + buf := bytes.NewBufferString("") + err := Forwarded(ctx, []string{ + "python3", "-c", "print(input('input: '))", + }, strings.NewReader("abc\n"), buf) + assert.NoError(t, err) + + assert.Equal(t, "input: abc\n", buf.String()) +} + +func TestForwardedFails(t *testing.T) { + ctx := context.Background() + buf := bytes.NewBufferString("") + err := Forwarded(ctx, []string{ + "_non_existent_", + }, strings.NewReader("abc\n"), buf) + assert.NotNil(t, err) +} + +func TestForwardedFailsOnStdinPipe(t *testing.T) { + ctx := context.Background() + buf := bytes.NewBufferString("") + err := Forwarded(ctx, []string{ + "_non_existent_", + }, strings.NewReader("abc\n"), buf, func(c *exec.Cmd) error { + c.Stdin = strings.NewReader("x") + return nil + }) + assert.NotNil(t, err) +} diff --git a/libs/process/opts.go b/libs/process/opts.go new file mode 100644 index 0000000000..5ba7808089 --- /dev/null +++ b/libs/process/opts.go @@ -0,0 +1,51 @@ +package process + +import ( + "fmt" + "io" + "os" + "os/exec" +) + +type execOption func(*exec.Cmd) error + +func WithEnv(key, value string) execOption { + return func(c *exec.Cmd) error { + if c.Env == nil { + c.Env = os.Environ() + } + v := fmt.Sprintf("%s=%s", key, value) + c.Env = append(c.Env, v) + return nil + } +} + +func WithEnvs(envs map[string]string) execOption { + return func(c *exec.Cmd) error { + for k, v := range envs { + err := WithEnv(k, v)(c) + if err != nil { + return err + } + } + return nil + } +} + +func WithDir(dir string) execOption { + return func(c *exec.Cmd) error { + c.Dir = dir + return nil + } +} + +func WithStdoutPipe(dst *io.ReadCloser) execOption { + return func(c *exec.Cmd) error { + outPipe, err := c.StdoutPipe() + if err != nil { + return err + } + *dst = outPipe + return nil + } +} diff --git a/libs/process/opts_test.go b/libs/process/opts_test.go new file mode 100644 index 0000000000..abe7a56a0b --- /dev/null +++ b/libs/process/opts_test.go @@ -0,0 +1,18 @@ +package process + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWithEnvs(t *testing.T) { + ctx := context.Background() + res, err := Background(ctx, []string{"/bin/sh", "-c", "echo $FOO $BAR"}, WithEnvs(map[string]string{ + "FOO": "foo", + "BAR": "delirium", + })) + assert.NoError(t, err) + assert.Equal(t, "foo delirium", res) +} diff --git a/python/runner.go b/python/runner.go index bdf386a0ea..ebf247172d 100644 --- a/python/runner.go +++ b/python/runner.go @@ -8,6 +8,8 @@ import ( "os/exec" "runtime" "strings" + + "github.com/databricks/cli/libs/process" ) func PyInline(ctx context.Context, inlinePy string) (string, error) { @@ -88,8 +90,8 @@ func DetectExecutable(ctx context.Context) (string, error) { func execAndPassErr(ctx context.Context, name string, args ...string) ([]byte, error) { // TODO: move out to a separate package, once we have Maven integration - out, err := exec.CommandContext(ctx, name, args...).Output() - return out, nicerErr(err) + out, err := process.Background(ctx, append([]string{name}, args...)) + return []byte(out), nicerErr(err) } func getFirstMatch(out string) string { diff --git a/python/runner_test.go b/python/runner_test.go index 3968e27a72..c2744efe77 100644 --- a/python/runner_test.go +++ b/python/runner_test.go @@ -20,7 +20,7 @@ func TestExecAndPassError(t *testing.T) { } _, err := execAndPassErr(context.Background(), "which", "__non_existing__") - assert.EqualError(t, err, "exit status 1") + assert.EqualError(t, err, "which __non_existing__: exit status 1") } func TestDetectPython(t *testing.T) { From e42b9e220e3e9b8f67ae3f176c68e9be1bf79918 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 00:36:47 +0200 Subject: [PATCH 02/10] fix test --- python/runner_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/runner_test.go b/python/runner_test.go index c2744efe77..f13a0d9945 100644 --- a/python/runner_test.go +++ b/python/runner_test.go @@ -90,5 +90,5 @@ func TestPyInlineStderr(t *testing.T) { DetectExecutable(context.Background()) inline := "import sys; sys.stderr.write('___msg___'); sys.exit(1)" _, err := PyInline(context.Background(), inline) - assert.EqualError(t, err, "___msg___") + assert.ErrorContains(t, err, "___msg___") } From 0fd00f2b85ddbfb8561f63d8b81167c23b9dcfac Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 11:14:45 +0200 Subject: [PATCH 03/10] fix issues with windows --- libs/process/forwarded_test.go | 2 +- libs/process/opts_test.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/libs/process/forwarded_test.go b/libs/process/forwarded_test.go index 0c28923112..63606f8e90 100644 --- a/libs/process/forwarded_test.go +++ b/libs/process/forwarded_test.go @@ -18,7 +18,7 @@ func TestForwarded(t *testing.T) { }, strings.NewReader("abc\n"), buf) assert.NoError(t, err) - assert.Equal(t, "input: abc\n", buf.String()) + assert.Equal(t, "input: abc", strings.TrimSpace(buf.String())) } func TestForwardedFails(t *testing.T) { diff --git a/libs/process/opts_test.go b/libs/process/opts_test.go index abe7a56a0b..df99b5f8b0 100644 --- a/libs/process/opts_test.go +++ b/libs/process/opts_test.go @@ -2,12 +2,18 @@ package process import ( "context" + "runtime" "testing" "github.com/stretchr/testify/assert" ) func TestWithEnvs(t *testing.T) { + if runtime.GOOS != "windows" { + // Skipping test on windows for now because of the following error: + // /bin/sh -c echo $FOO $BAR: exec: "/bin/sh": file does not exist + t.SkipNow() + } ctx := context.Background() res, err := Background(ctx, []string{"/bin/sh", "-c", "echo $FOO $BAR"}, WithEnvs(map[string]string{ "FOO": "foo", From a1238a20ddd4486cfa13ac375a0489507c897747 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 13:39:24 +0200 Subject: [PATCH 04/10] fix wrong copy-paste --- libs/process/opts_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/process/opts_test.go b/libs/process/opts_test.go index df99b5f8b0..6e37c29602 100644 --- a/libs/process/opts_test.go +++ b/libs/process/opts_test.go @@ -9,7 +9,7 @@ import ( ) func TestWithEnvs(t *testing.T) { - if runtime.GOOS != "windows" { + if runtime.GOOS == "windows" { // Skipping test on windows for now because of the following error: // /bin/sh -c echo $FOO $BAR: exec: "/bin/sh": file does not exist t.SkipNow() From 2e0d41a66f2a67abe8ba188af80b19b9a4e8fb40 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 16:26:16 +0200 Subject: [PATCH 05/10] allow access to stderr --- libs/git/clone.go | 4 ++++ libs/process/background.go | 33 +++++++++++++++++++++++++++------ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/libs/git/clone.go b/libs/git/clone.go index 5d3f97e96c..e7d001cd7e 100644 --- a/libs/git/clone.go +++ b/libs/git/clone.go @@ -48,6 +48,10 @@ func (opts cloneOptions) clone(ctx context.Context) error { if errors.Is(err, exec.ErrNotFound) { return fmt.Errorf("please install git CLI to clone a repository: %w", err) } + var processErr *process.ProcessError + if errors.As(err, &processErr) { + return fmt.Errorf("git clone failed: %w. %s", err, processErr.Stderr) + } if err != nil { return fmt.Errorf("git clone failed: %w", err) } diff --git a/libs/process/background.go b/libs/process/background.go index 2e3675c0b8..cd033c086c 100644 --- a/libs/process/background.go +++ b/libs/process/background.go @@ -4,21 +4,36 @@ import ( "bytes" "context" "fmt" - "os" "os/exec" "strings" "github.com/databricks/cli/libs/log" ) +type ProcessError struct { + Command string + Err error + Stdout string + Stderr string +} + +func (perr *ProcessError) Unwrap() error { + return perr.Err +} + +func (perr *ProcessError) Error() string { + return fmt.Sprintf("%s: %s %s", perr.Command, perr.Stderr, perr.Err) +} + func Background(ctx context.Context, args []string, opts ...execOption) (string, error) { commandStr := strings.Join(args, " ") log.Debugf(ctx, "running: %s", commandStr) cmd := exec.CommandContext(ctx, args[0], args[1:]...) - stdout := &bytes.Buffer{} - cmd.Stdin = os.Stdin - cmd.Stdout = stdout - cmd.Stderr = stdout + var stdout, stderr bytes.Buffer + // For background processes, there's no standard input + cmd.Stdin = nil + cmd.Stdout = &stdout + cmd.Stderr = &stderr for _, o := range opts { err := o(cmd) if err != nil { @@ -26,7 +41,13 @@ func Background(ctx context.Context, args []string, opts ...execOption) (string, } } if err := cmd.Run(); err != nil { - return "", fmt.Errorf("%s: %s %w", commandStr, stdout.String(), err) + return "", &ProcessError{ + Err: err, + Command: commandStr, + Stdout: stdout.String(), + Stderr: stderr.String(), + } } + // trim leading/trailing whitespace from the output return strings.TrimSpace(stdout.String()), nil } From f0b4477b4d1becd4c0292c05d5b7bef770263a62 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 17:20:19 +0200 Subject: [PATCH 06/10] robust --- bundle/config/artifact.go | 9 ++++-- libs/env/context.go | 19 ++++++++++++ libs/env/context_test.go | 7 +++++ libs/process/background.go | 4 +-- libs/process/background_test.go | 52 ++++++++++++++++++++++++++++++++- libs/process/forwarded.go | 7 ++++- libs/process/forwarded_test.go | 12 ++++---- libs/process/opts.go | 31 +++++++++++++++----- libs/process/opts_test.go | 25 ++++++++++++++++ 9 files changed, 145 insertions(+), 21 deletions(-) diff --git a/bundle/config/artifact.go b/bundle/config/artifact.go index 44da150552..755116eb39 100644 --- a/bundle/config/artifact.go +++ b/bundle/config/artifact.go @@ -56,11 +56,14 @@ func (a *Artifact) Build(ctx context.Context) ([]byte, error) { commands := strings.Split(a.BuildCommand, " && ") for _, command := range commands { buildParts := strings.Split(command, " ") - res, err := process.Background(ctx, buildParts, process.WithDir(a.Path)) + var buf bytes.Buffer + _, err := process.Background(ctx, buildParts, + process.WithCombinedOutput(&buf), + process.WithDir(a.Path)) if err != nil { - return nil, err + return buf.Bytes(), err } - out = append(out, []byte(res)) + out = append(out, buf.Bytes()) } return bytes.Join(out, []byte{}), nil } diff --git a/libs/env/context.go b/libs/env/context.go index cf04c1ece9..92f2a0415a 100644 --- a/libs/env/context.go +++ b/libs/env/context.go @@ -3,6 +3,7 @@ package env import ( "context" "os" + "strings" ) var envContextKey int @@ -61,3 +62,21 @@ func Set(ctx context.Context, key, value string) context.Context { m[key] = value return setMap(ctx, m) } + +// All returns environment variables that are defined in both os.Environ +// and this package. `env.Set(ctx, x, y)` will override x from os.Environ. +func All(ctx context.Context) map[string]string { + m := map[string]string{} + for _, line := range os.Environ() { + split := strings.SplitN(line, "=", 2) + if len(split) != 2 { + continue + } + m[split[0]] = split[1] + } + // override existing environment variables with the ones we set + for k, v := range copyMap(getMap(ctx)) { + m[k] = v + } + return m +} diff --git a/libs/env/context_test.go b/libs/env/context_test.go index 9ff1945971..23dc886eb8 100644 --- a/libs/env/context_test.go +++ b/libs/env/context_test.go @@ -38,4 +38,11 @@ func TestContext(t *testing.T) { assert.Equal(t, "qux", Get(ctx2, "FOO")) assert.Equal(t, "baz", Get(ctx1, "FOO")) assert.Equal(t, "bar", Get(ctx0, "FOO")) + + ctx3 := Set(ctx2, "BAR", "x=y") + + all := All(ctx3) + assert.NotNil(t, all) + assert.Equal(t, "qux", all["FOO"]) + assert.Equal(t, "x=y", all["BAR"]) } diff --git a/libs/process/background.go b/libs/process/background.go index cd033c086c..55d1b0ba84 100644 --- a/libs/process/background.go +++ b/libs/process/background.go @@ -35,13 +35,13 @@ func Background(ctx context.Context, args []string, opts ...execOption) (string, cmd.Stdout = &stdout cmd.Stderr = &stderr for _, o := range opts { - err := o(cmd) + err := o(ctx, cmd) if err != nil { return "", err } } if err := cmd.Run(); err != nil { - return "", &ProcessError{ + return stdout.String(), &ProcessError{ Err: err, Command: commandStr, Stdout: stdout.String(), diff --git a/libs/process/background_test.go b/libs/process/background_test.go index c5cc003c49..b9d872bb90 100644 --- a/libs/process/background_test.go +++ b/libs/process/background_test.go @@ -1,14 +1,22 @@ package process import ( + "bytes" "context" "fmt" + "os" "os/exec" "testing" "github.com/stretchr/testify/assert" ) +func TestBackgroundUnwrapsNotFound(t *testing.T) { + ctx := context.Background() + _, err := Background(ctx, []string{"/bin/meeecho", "1"}) + assert.ErrorIs(t, err, os.ErrNotExist) +} + func TestBackground(t *testing.T) { ctx := context.Background() res, err := Background(ctx, []string{"echo", "1"}, WithDir("/")) @@ -16,6 +24,48 @@ func TestBackground(t *testing.T) { assert.Equal(t, "1", res) } +func TestBackgroundOnlyStdoutGetsoutOnSuccess(t *testing.T) { + ctx := context.Background() + res, err := Background(ctx, []string{ + "python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2')", + }) + assert.NoError(t, err) + assert.Equal(t, "2", res) +} + +func TestBackgroundCombinedOutput(t *testing.T) { + ctx := context.Background() + var buf bytes.Buffer + res, err := Background(ctx, []string{ + "python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2')", + }, WithCombinedOutput(&buf)) + assert.NoError(t, err) + assert.Equal(t, "2", res) + assert.Equal(t, "12", buf.String()) +} + +func TestBackgroundCombinedOutputFailure(t *testing.T) { + ctx := context.Background() + var buf bytes.Buffer + res, err := Background(ctx, []string{ + "python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2'); sys.exit(42)", + }, WithCombinedOutput(&buf)) + var processErr *ProcessError + if assert.ErrorAs(t, err, &processErr) { + assert.Equal(t, "1", processErr.Stderr) + assert.Equal(t, "2", processErr.Stdout) + } + assert.Equal(t, "2", res) + assert.Equal(t, "12", buf.String()) +} + +func TestBackgroundNoStdin(t *testing.T) { + ctx := context.Background() + res, err := Background(ctx, []string{"cat"}) + assert.NoError(t, err) + assert.Equal(t, "", res) +} + func TestBackgroundFails(t *testing.T) { ctx := context.Background() _, err := Background(ctx, []string{"ls", "/dev/null/x"}) @@ -24,7 +74,7 @@ func TestBackgroundFails(t *testing.T) { func TestBackgroundFailsOnOption(t *testing.T) { ctx := context.Background() - _, err := Background(ctx, []string{"ls", "/dev/null/x"}, func(c *exec.Cmd) error { + _, err := Background(ctx, []string{"ls", "/dev/null/x"}, func(_ context.Context, c *exec.Cmd) error { return fmt.Errorf("nope") }) assert.EqualError(t, err, "nope") diff --git a/libs/process/forwarded.go b/libs/process/forwarded.go index 19bf875ba9..0388da9683 100644 --- a/libs/process/forwarded.go +++ b/libs/process/forwarded.go @@ -16,6 +16,8 @@ func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, // make sure to sync on writing to stdout reader, writer := io.Pipe() + + // empirical tests showed buffered copies being more responsive go io.CopyBuffer(dst, reader, make([]byte, 128)) defer reader.Close() defer writer.Close() @@ -24,7 +26,7 @@ func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, // apply common options for _, o := range opts { - err := o(cmd) + err := o(ctx, cmd) if err != nil { return err } @@ -37,6 +39,9 @@ func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, return err } go io.CopyBuffer(stdin, src, make([]byte, 128)) + + // This is the place where terminal detection methods in the child processes might break, + // but we'll fix that once there's such a problem. defer stdin.Close() err = cmd.Start() diff --git a/libs/process/forwarded_test.go b/libs/process/forwarded_test.go index 63606f8e90..be715cb829 100644 --- a/libs/process/forwarded_test.go +++ b/libs/process/forwarded_test.go @@ -12,10 +12,10 @@ import ( func TestForwarded(t *testing.T) { ctx := context.Background() - buf := bytes.NewBufferString("") + var buf bytes.Buffer err := Forwarded(ctx, []string{ "python3", "-c", "print(input('input: '))", - }, strings.NewReader("abc\n"), buf) + }, strings.NewReader("abc\n"), &buf) assert.NoError(t, err) assert.Equal(t, "input: abc", strings.TrimSpace(buf.String())) @@ -23,19 +23,19 @@ func TestForwarded(t *testing.T) { func TestForwardedFails(t *testing.T) { ctx := context.Background() - buf := bytes.NewBufferString("") + var buf bytes.Buffer err := Forwarded(ctx, []string{ "_non_existent_", - }, strings.NewReader("abc\n"), buf) + }, strings.NewReader("abc\n"), &buf) assert.NotNil(t, err) } func TestForwardedFailsOnStdinPipe(t *testing.T) { ctx := context.Background() - buf := bytes.NewBufferString("") + var buf bytes.Buffer err := Forwarded(ctx, []string{ "_non_existent_", - }, strings.NewReader("abc\n"), buf, func(c *exec.Cmd) error { + }, strings.NewReader("abc\n"), &buf, func(_ context.Context, c *exec.Cmd) error { c.Stdin = strings.NewReader("x") return nil }) diff --git a/libs/process/opts.go b/libs/process/opts.go index 5ba7808089..ce56aa51f4 100644 --- a/libs/process/opts.go +++ b/libs/process/opts.go @@ -1,18 +1,25 @@ package process import ( + "bytes" + "context" "fmt" "io" - "os" "os/exec" + + "github.com/databricks/cli/libs/env" ) -type execOption func(*exec.Cmd) error +type execOption func(context.Context, *exec.Cmd) error func WithEnv(key, value string) execOption { - return func(c *exec.Cmd) error { + return func(ctx context.Context, c *exec.Cmd) error { + // we pull the env through lib/env such that we can run + // parallel tests with anything using libs/process. if c.Env == nil { - c.Env = os.Environ() + for k, v := range env.All(ctx) { + c.Env = append(c.Env, fmt.Sprintf("%s=%s", k, v)) + } } v := fmt.Sprintf("%s=%s", key, value) c.Env = append(c.Env, v) @@ -21,9 +28,9 @@ func WithEnv(key, value string) execOption { } func WithEnvs(envs map[string]string) execOption { - return func(c *exec.Cmd) error { + return func(ctx context.Context, c *exec.Cmd) error { for k, v := range envs { - err := WithEnv(k, v)(c) + err := WithEnv(k, v)(ctx, c) if err != nil { return err } @@ -33,14 +40,14 @@ func WithEnvs(envs map[string]string) execOption { } func WithDir(dir string) execOption { - return func(c *exec.Cmd) error { + return func(_ context.Context, c *exec.Cmd) error { c.Dir = dir return nil } } func WithStdoutPipe(dst *io.ReadCloser) execOption { - return func(c *exec.Cmd) error { + return func(_ context.Context, c *exec.Cmd) error { outPipe, err := c.StdoutPipe() if err != nil { return err @@ -49,3 +56,11 @@ func WithStdoutPipe(dst *io.ReadCloser) execOption { return nil } } + +func WithCombinedOutput(buf *bytes.Buffer) execOption { + return func(_ context.Context, c *exec.Cmd) error { + c.Stdout = io.MultiWriter(c.Stdout, buf) + c.Stderr = io.MultiWriter(c.Stderr, buf) + return nil + } +} diff --git a/libs/process/opts_test.go b/libs/process/opts_test.go index 6e37c29602..436f4f9fc0 100644 --- a/libs/process/opts_test.go +++ b/libs/process/opts_test.go @@ -2,9 +2,13 @@ package process import ( "context" + "os/exec" "runtime" + "sort" "testing" + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/env" "github.com/stretchr/testify/assert" ) @@ -22,3 +26,24 @@ func TestWithEnvs(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "foo delirium", res) } + +func TestWorksWithLibsEnv(t *testing.T) { + testutil.CleanupEnvironment(t) + ctx := context.Background() + ctx2 := env.Set(ctx, "AAA", "BBB") + + cmd := &exec.Cmd{} + err := WithEnvs(map[string]string{ + "CCC": "DDD", + "EEE": "FFF", + })(ctx2, cmd) + assert.NoError(t, err) + + vars := cmd.Environ() + sort.Strings(vars) + + assert.Len(t, vars, 5) + assert.Equal(t, "AAA=BBB", vars[0]) + assert.Equal(t, "CCC=DDD", vars[1]) + assert.Equal(t, "EEE=FFF", vars[2]) +} From feafc5f7ce4f31b1be253bc4d498054a479c724b Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 17:26:51 +0200 Subject: [PATCH 07/10] will it make it less flaky? --- libs/process/opts.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/process/opts.go b/libs/process/opts.go index ce56aa51f4..f52ac77332 100644 --- a/libs/process/opts.go +++ b/libs/process/opts.go @@ -59,8 +59,8 @@ func WithStdoutPipe(dst *io.ReadCloser) execOption { func WithCombinedOutput(buf *bytes.Buffer) execOption { return func(_ context.Context, c *exec.Cmd) error { - c.Stdout = io.MultiWriter(c.Stdout, buf) - c.Stderr = io.MultiWriter(c.Stderr, buf) + c.Stdout = io.MultiWriter(buf, c.Stdout) + c.Stderr = io.MultiWriter(buf, c.Stderr) return nil } } From cf068d063987684d3c42160e494a867667f8d38e Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 18:48:03 +0200 Subject: [PATCH 08/10] address comments --- libs/process/background.go | 3 +-- libs/process/forwarded.go | 15 ++------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/libs/process/background.go b/libs/process/background.go index 55d1b0ba84..7b87a86e2a 100644 --- a/libs/process/background.go +++ b/libs/process/background.go @@ -48,6 +48,5 @@ func Background(ctx context.Context, args []string, opts ...execOption) (string, Stderr: stderr.String(), } } - // trim leading/trailing whitespace from the output - return strings.TrimSpace(stdout.String()), nil + return stdout.String(), nil } diff --git a/libs/process/forwarded.go b/libs/process/forwarded.go index 0388da9683..00e1f10995 100644 --- a/libs/process/forwarded.go +++ b/libs/process/forwarded.go @@ -23,6 +23,7 @@ func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, defer writer.Close() cmd.Stdout = writer cmd.Stderr = writer + cmd.Stdin = src // apply common options for _, o := range opts { @@ -32,19 +33,7 @@ func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, } } - // pipe standard input to the child process, so that we can allow terminal UX - // see the PoC at https://github.com/databricks/cli/pull/637 - stdin, err := cmd.StdinPipe() - if err != nil { - return err - } - go io.CopyBuffer(stdin, src, make([]byte, 128)) - - // This is the place where terminal detection methods in the child processes might break, - // but we'll fix that once there's such a problem. - defer stdin.Close() - - err = cmd.Start() + err := cmd.Start() if err != nil { return err } From 5467165efbaf1560c6934d3544b0a4f91253d07d Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Tue, 26 Sep 2023 20:36:14 +0200 Subject: [PATCH 09/10] more tests --- libs/env/context.go | 2 +- libs/env/context_test.go | 1 + libs/process/background.go | 11 +++++++++-- libs/process/background_test.go | 31 ++++++++++++++++++++----------- libs/process/forwarded.go | 19 ++++++++++--------- libs/process/forwarded_test.go | 6 +++--- libs/process/opts.go | 9 --------- libs/process/opts_test.go | 16 +++++++--------- python/runner_test.go | 4 ++-- 9 files changed, 53 insertions(+), 46 deletions(-) diff --git a/libs/env/context.go b/libs/env/context.go index 92f2a0415a..bbe294d7b4 100644 --- a/libs/env/context.go +++ b/libs/env/context.go @@ -75,7 +75,7 @@ func All(ctx context.Context) map[string]string { m[split[0]] = split[1] } // override existing environment variables with the ones we set - for k, v := range copyMap(getMap(ctx)) { + for k, v := range getMap(ctx) { m[k] = v } return m diff --git a/libs/env/context_test.go b/libs/env/context_test.go index 23dc886eb8..39553448cc 100644 --- a/libs/env/context_test.go +++ b/libs/env/context_test.go @@ -45,4 +45,5 @@ func TestContext(t *testing.T) { assert.NotNil(t, all) assert.Equal(t, "qux", all["FOO"]) assert.Equal(t, "x=y", all["BAR"]) + assert.NotEmpty(t, all["PATH"]) } diff --git a/libs/process/background.go b/libs/process/background.go index 7b87a86e2a..26178a1dcf 100644 --- a/libs/process/background.go +++ b/libs/process/background.go @@ -7,6 +7,7 @@ import ( "os/exec" "strings" + "github.com/databricks/cli/libs/env" "github.com/databricks/cli/libs/log" ) @@ -22,18 +23,24 @@ func (perr *ProcessError) Unwrap() error { } func (perr *ProcessError) Error() string { - return fmt.Sprintf("%s: %s %s", perr.Command, perr.Stderr, perr.Err) + return fmt.Sprintf("%s: %s", perr.Command, perr.Err) } func Background(ctx context.Context, args []string, opts ...execOption) (string, error) { commandStr := strings.Join(args, " ") log.Debugf(ctx, "running: %s", commandStr) cmd := exec.CommandContext(ctx, args[0], args[1:]...) - var stdout, stderr bytes.Buffer + stdout := bytes.Buffer{} + stderr := bytes.Buffer{} // For background processes, there's no standard input cmd.Stdin = nil cmd.Stdout = &stdout cmd.Stderr = &stderr + // we pull the env through lib/env such that we can run + // parallel tests with anything using libs/process. + for k, v := range env.All(ctx) { + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v)) + } for _, o := range opts { err := o(ctx, cmd) if err != nil { diff --git a/libs/process/background_test.go b/libs/process/background_test.go index b9d872bb90..5434a9e2b7 100644 --- a/libs/process/background_test.go +++ b/libs/process/background_test.go @@ -21,7 +21,7 @@ func TestBackground(t *testing.T) { ctx := context.Background() res, err := Background(ctx, []string{"echo", "1"}, WithDir("/")) assert.NoError(t, err) - assert.Equal(t, "1", res) + assert.Equal(t, "1\n", res) } func TestBackgroundOnlyStdoutGetsoutOnSuccess(t *testing.T) { @@ -35,28 +35,37 @@ func TestBackgroundOnlyStdoutGetsoutOnSuccess(t *testing.T) { func TestBackgroundCombinedOutput(t *testing.T) { ctx := context.Background() - var buf bytes.Buffer + buf := bytes.Buffer{} res, err := Background(ctx, []string{ - "python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2')", + "python3", "-c", "import sys, time; " + + `sys.stderr.write("1\n"); sys.stderr.flush(); ` + + "time.sleep(0.001); " + + "print('2', flush=True); sys.stdout.flush(); " + + "time.sleep(0.001)", }, WithCombinedOutput(&buf)) assert.NoError(t, err) - assert.Equal(t, "2", res) - assert.Equal(t, "12", buf.String()) + assert.Equal(t, "2\n", res) + assert.Equal(t, "1\n2\n", buf.String()) } func TestBackgroundCombinedOutputFailure(t *testing.T) { ctx := context.Background() - var buf bytes.Buffer + buf := bytes.Buffer{} res, err := Background(ctx, []string{ - "python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2'); sys.exit(42)", + "python3", "-c", "import sys, time; " + + `sys.stderr.write("1\n"); sys.stderr.flush(); ` + + "time.sleep(0.001); " + + "print('2', flush=True); sys.stdout.flush(); " + + "time.sleep(0.001); " + + "sys.exit(42)", }, WithCombinedOutput(&buf)) var processErr *ProcessError if assert.ErrorAs(t, err, &processErr) { - assert.Equal(t, "1", processErr.Stderr) - assert.Equal(t, "2", processErr.Stdout) + assert.Equal(t, "1\n", processErr.Stderr) + assert.Equal(t, "2\n", processErr.Stdout) } - assert.Equal(t, "2", res) - assert.Equal(t, "12", buf.String()) + assert.Equal(t, "2\n", res) + assert.Equal(t, "1\n2\n", buf.String()) } func TestBackgroundNoStdin(t *testing.T) { diff --git a/libs/process/forwarded.go b/libs/process/forwarded.go index 00e1f10995..df3c2dbd7d 100644 --- a/libs/process/forwarded.go +++ b/libs/process/forwarded.go @@ -2,28 +2,29 @@ package process import ( "context" + "fmt" "io" "os/exec" "strings" + "github.com/databricks/cli/libs/env" "github.com/databricks/cli/libs/log" ) -func Forwarded(ctx context.Context, args []string, src io.Reader, dst io.Writer, opts ...execOption) error { +func Forwarded(ctx context.Context, args []string, src io.Reader, outWriter, errWriter io.Writer, opts ...execOption) error { commandStr := strings.Join(args, " ") log.Debugf(ctx, "starting: %s", commandStr) cmd := exec.CommandContext(ctx, args[0], args[1:]...) - // make sure to sync on writing to stdout - reader, writer := io.Pipe() - // empirical tests showed buffered copies being more responsive - go io.CopyBuffer(dst, reader, make([]byte, 128)) - defer reader.Close() - defer writer.Close() - cmd.Stdout = writer - cmd.Stderr = writer + cmd.Stdout = outWriter + cmd.Stderr = errWriter cmd.Stdin = src + // we pull the env through lib/env such that we can run + // parallel tests with anything using libs/process. + for k, v := range env.All(ctx) { + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v)) + } // apply common options for _, o := range opts { diff --git a/libs/process/forwarded_test.go b/libs/process/forwarded_test.go index be715cb829..ddb79818f4 100644 --- a/libs/process/forwarded_test.go +++ b/libs/process/forwarded_test.go @@ -15,7 +15,7 @@ func TestForwarded(t *testing.T) { var buf bytes.Buffer err := Forwarded(ctx, []string{ "python3", "-c", "print(input('input: '))", - }, strings.NewReader("abc\n"), &buf) + }, strings.NewReader("abc\n"), &buf, &buf) assert.NoError(t, err) assert.Equal(t, "input: abc", strings.TrimSpace(buf.String())) @@ -26,7 +26,7 @@ func TestForwardedFails(t *testing.T) { var buf bytes.Buffer err := Forwarded(ctx, []string{ "_non_existent_", - }, strings.NewReader("abc\n"), &buf) + }, strings.NewReader("abc\n"), &buf, &buf) assert.NotNil(t, err) } @@ -35,7 +35,7 @@ func TestForwardedFailsOnStdinPipe(t *testing.T) { var buf bytes.Buffer err := Forwarded(ctx, []string{ "_non_existent_", - }, strings.NewReader("abc\n"), &buf, func(_ context.Context, c *exec.Cmd) error { + }, strings.NewReader("abc\n"), &buf, &buf, func(_ context.Context, c *exec.Cmd) error { c.Stdin = strings.NewReader("x") return nil }) diff --git a/libs/process/opts.go b/libs/process/opts.go index f52ac77332..e201c66684 100644 --- a/libs/process/opts.go +++ b/libs/process/opts.go @@ -6,21 +6,12 @@ import ( "fmt" "io" "os/exec" - - "github.com/databricks/cli/libs/env" ) type execOption func(context.Context, *exec.Cmd) error func WithEnv(key, value string) execOption { return func(ctx context.Context, c *exec.Cmd) error { - // we pull the env through lib/env such that we can run - // parallel tests with anything using libs/process. - if c.Env == nil { - for k, v := range env.All(ctx) { - c.Env = append(c.Env, fmt.Sprintf("%s=%s", k, v)) - } - } v := fmt.Sprintf("%s=%s", key, value) c.Env = append(c.Env, v) return nil diff --git a/libs/process/opts_test.go b/libs/process/opts_test.go index 436f4f9fc0..80540203df 100644 --- a/libs/process/opts_test.go +++ b/libs/process/opts_test.go @@ -19,31 +19,29 @@ func TestWithEnvs(t *testing.T) { t.SkipNow() } ctx := context.Background() - res, err := Background(ctx, []string{"/bin/sh", "-c", "echo $FOO $BAR"}, WithEnvs(map[string]string{ - "FOO": "foo", + ctx2 := env.Set(ctx, "FOO", "foo") + res, err := Background(ctx2, []string{"/bin/sh", "-c", "echo $FOO $BAR"}, WithEnvs(map[string]string{ "BAR": "delirium", })) assert.NoError(t, err) - assert.Equal(t, "foo delirium", res) + assert.Equal(t, "foo delirium\n", res) } func TestWorksWithLibsEnv(t *testing.T) { testutil.CleanupEnvironment(t) ctx := context.Background() - ctx2 := env.Set(ctx, "AAA", "BBB") cmd := &exec.Cmd{} err := WithEnvs(map[string]string{ "CCC": "DDD", "EEE": "FFF", - })(ctx2, cmd) + })(ctx, cmd) assert.NoError(t, err) vars := cmd.Environ() sort.Strings(vars) - assert.Len(t, vars, 5) - assert.Equal(t, "AAA=BBB", vars[0]) - assert.Equal(t, "CCC=DDD", vars[1]) - assert.Equal(t, "EEE=FFF", vars[2]) + assert.Len(t, vars, 2) + assert.Equal(t, "CCC=DDD", vars[0]) + assert.Equal(t, "EEE=FFF", vars[1]) } diff --git a/python/runner_test.go b/python/runner_test.go index f13a0d9945..fc8f2508db 100644 --- a/python/runner_test.go +++ b/python/runner_test.go @@ -20,7 +20,7 @@ func TestExecAndPassError(t *testing.T) { } _, err := execAndPassErr(context.Background(), "which", "__non_existing__") - assert.EqualError(t, err, "which __non_existing__: exit status 1") + assert.EqualError(t, err, "which __non_existing__: exit status 1") } func TestDetectPython(t *testing.T) { @@ -77,7 +77,7 @@ func testTempdir(t *testing.T, dir *string) func() { func TestPyError(t *testing.T) { _, err := Py(context.Background(), "__non_existing__.py") - assert.Contains(t, err.Error(), "can't open file") + assert.Contains(t, err.Error(), "exit status 2") } func TestPyInline(t *testing.T) { From ed030351ac847c96962ea9cd13b6dd7f45289a53 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Wed, 27 Sep 2023 10:54:07 +0200 Subject: [PATCH 10/10] make it work on windows --- libs/process/background_test.go | 15 ++++++++------- libs/process/opts_test.go | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/libs/process/background_test.go b/libs/process/background_test.go index 5434a9e2b7..94f7e881ee 100644 --- a/libs/process/background_test.go +++ b/libs/process/background_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "os/exec" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -21,7 +22,7 @@ func TestBackground(t *testing.T) { ctx := context.Background() res, err := Background(ctx, []string{"echo", "1"}, WithDir("/")) assert.NoError(t, err) - assert.Equal(t, "1\n", res) + assert.Equal(t, "1", strings.TrimSpace(res)) } func TestBackgroundOnlyStdoutGetsoutOnSuccess(t *testing.T) { @@ -44,8 +45,8 @@ func TestBackgroundCombinedOutput(t *testing.T) { "time.sleep(0.001)", }, WithCombinedOutput(&buf)) assert.NoError(t, err) - assert.Equal(t, "2\n", res) - assert.Equal(t, "1\n2\n", buf.String()) + assert.Equal(t, "2", strings.TrimSpace(res)) + assert.Equal(t, "1\n2\n", strings.ReplaceAll(buf.String(), "\r", "")) } func TestBackgroundCombinedOutputFailure(t *testing.T) { @@ -61,11 +62,11 @@ func TestBackgroundCombinedOutputFailure(t *testing.T) { }, WithCombinedOutput(&buf)) var processErr *ProcessError if assert.ErrorAs(t, err, &processErr) { - assert.Equal(t, "1\n", processErr.Stderr) - assert.Equal(t, "2\n", processErr.Stdout) + assert.Equal(t, "1", strings.TrimSpace(processErr.Stderr)) + assert.Equal(t, "2", strings.TrimSpace(processErr.Stdout)) } - assert.Equal(t, "2\n", res) - assert.Equal(t, "1\n2\n", buf.String()) + assert.Equal(t, "2", strings.TrimSpace(res)) + assert.Equal(t, "1\n2\n", strings.ReplaceAll(buf.String(), "\r", "")) } func TestBackgroundNoStdin(t *testing.T) { diff --git a/libs/process/opts_test.go b/libs/process/opts_test.go index 80540203df..3a819fbb96 100644 --- a/libs/process/opts_test.go +++ b/libs/process/opts_test.go @@ -41,7 +41,7 @@ func TestWorksWithLibsEnv(t *testing.T) { vars := cmd.Environ() sort.Strings(vars) - assert.Len(t, vars, 2) + assert.True(t, len(vars) >= 2) assert.Equal(t, "CCC=DDD", vars[0]) assert.Equal(t, "EEE=FFF", vars[1]) }