From 7ce11e8567de4a24bb5015694982d5bfdbcb1337 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Thu, 1 Jun 2023 15:43:26 -0600 Subject: [PATCH 1/3] use conc --- internal/uploadstore/pool.go | 75 +++++------------------------------- 1 file changed, 9 insertions(+), 66 deletions(-) diff --git a/internal/uploadstore/pool.go b/internal/uploadstore/pool.go index 17cbeb5677b2..bae33f738cd3 100644 --- a/internal/uploadstore/pool.go +++ b/internal/uploadstore/pool.go @@ -2,81 +2,24 @@ package uploadstore import ( "runtime" - "sync" - "github.com/sourcegraph/sourcegraph/lib/errors" + "github.com/sourcegraph/conc/pool" ) -// poolWorker is a function invoked by RunWorkers that sends -// any errors that occur during execution down a shared channel. -type poolWorker func(errs chan<- error) - -// runWorkersN invokes the given worker n times and collects the -// errors from each invocation. -func runWorkersN(n int, worker poolWorker) (err error) { - errs := make(chan error, n) - - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - go func() { worker(errs); wg.Done() }() - } - - go func() { - wg.Wait() - close(errs) - }() - - for e := range errs { - if err == nil { - err = e - } else { - err = errors.Append(err, e) - } - } - - return err -} - // RunWorkersOverStrings invokes the given worker once for each of the // given string values. The worker function will receive the index as well // as the string value as parameters. Workers will be invoked in a number // of concurrent routines proportional to the maximum number of CPUs that // can be executing simultaneously. func RunWorkersOverStrings(values []string, worker func(index int, value string) error) error { - return runWorkersOverStringsN(runtime.GOMAXPROCS(0), values, worker) -} - -// RunWorkersOverStrings invokes the given worker once for each of the -// given string values. The worker function will receive the index as well -// as the string value as parameters. Workers will be invoked in n concurrent -// routines. -func runWorkersOverStringsN(n int, values []string, worker func(index int, value string) error) error { - return runWorkersN(n, indexedStringWorker(loadIndexedStringChannel(values), worker)) -} - -type indexedString struct { - index int - value string -} - -func loadIndexedStringChannel(values []string) <-chan indexedString { - ch := make(chan indexedString, len(values)) - defer close(ch) - + p := pool.New(). + WithErrors(). + WithMaxGoroutines(runtime.GOMAXPROCS(0)) for i, value := range values { - ch <- indexedString{index: i, value: value} - } - - return ch -} - -func indexedStringWorker(ch <-chan indexedString, worker func(index int, value string) error) poolWorker { - return func(errs chan<- error) { - for value := range ch { - if err := worker(value.index, value.value); err != nil { - errs <- err - } - } + i, value := i, value + p.Go(func() error { + return worker(i, value) + }) } + return p.Wait() } From f70051320c590d9c5c53ef58e4e4b881777f4077 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Thu, 1 Jun 2023 15:46:01 -0600 Subject: [PATCH 2/3] bazel configure --- internal/goroutine/BUILD.bazel | 3 --- internal/uploadstore/BUILD.bazel | 2 ++ 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/goroutine/BUILD.bazel b/internal/goroutine/BUILD.bazel index 8d262987358a..42411e3d8dec 100644 --- a/internal/goroutine/BUILD.bazel +++ b/internal/goroutine/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "background.go", "goroutine.go", "periodic.go", - "pool.go", ], importpath = "github.com/sourcegraph/sourcegraph/internal/goroutine", visibility = ["//:__subpackages__"], @@ -29,12 +28,10 @@ go_test( "goroutine_test.go", "mocks_test.go", "periodic_test.go", - "pool_test.go", ], embed = [":goroutine"], deps = [ "//lib/errors", "@com_github_derision_test_glock//:glock", - "@com_github_google_go_cmp//cmp", ], ) diff --git a/internal/uploadstore/BUILD.bazel b/internal/uploadstore/BUILD.bazel index 0615822813c2..b4e8fa93e5d9 100644 --- a/internal/uploadstore/BUILD.bazel +++ b/internal/uploadstore/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "gcs_client.go", "lazy_client.go", "observability.go", + "pool.go", "s3_api.go", "s3_client.go", "store.go", @@ -27,6 +28,7 @@ go_library( "@com_github_aws_aws_sdk_go_v2_service_s3//:s3", "@com_github_aws_aws_sdk_go_v2_service_s3//types", "@com_github_inconshreveable_log15//:log15", + "@com_github_sourcegraph_conc//pool", "@com_github_sourcegraph_log//:log", "@com_google_cloud_go_storage//:storage", "@io_opentelemetry_go_otel//attribute", From f4f5a211df71b18de7273f4325330014a2bbc5ea Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Thu, 1 Jun 2023 15:58:48 -0600 Subject: [PATCH 3/3] update naming --- internal/uploadstore/gcs_client.go | 2 +- internal/uploadstore/pool.go | 10 +++++----- internal/uploadstore/s3_client.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/uploadstore/gcs_client.go b/internal/uploadstore/gcs_client.go index f83942e94720..04ebefd29b58 100644 --- a/internal/uploadstore/gcs_client.go +++ b/internal/uploadstore/gcs_client.go @@ -193,7 +193,7 @@ func (s *gcsStore) create(ctx context.Context, bucket gcsBucketHandle) error { } func (s *gcsStore) deleteSources(ctx context.Context, bucket gcsBucketHandle, sources []string) error { - return RunWorkersOverStrings(sources, func(index int, source string) error { + return ForEachString(sources, func(index int, source string) error { if err := bucket.Object(source).Delete(ctx); err != nil { return errors.Wrap(err, "failed to delete source object") } diff --git a/internal/uploadstore/pool.go b/internal/uploadstore/pool.go index bae33f738cd3..9bd888293e00 100644 --- a/internal/uploadstore/pool.go +++ b/internal/uploadstore/pool.go @@ -6,19 +6,19 @@ import ( "github.com/sourcegraph/conc/pool" ) -// RunWorkersOverStrings invokes the given worker once for each of the -// given string values. The worker function will receive the index as well -// as the string value as parameters. Workers will be invoked in a number +// ForEachString invokes the given callback once for each of the +// given string values. The callback function will receive the index as well +// as the string value as parameters. Callbacks will be invoked in a number // of concurrent routines proportional to the maximum number of CPUs that // can be executing simultaneously. -func RunWorkersOverStrings(values []string, worker func(index int, value string) error) error { +func ForEachString(values []string, f func(index int, value string) error) error { p := pool.New(). WithErrors(). WithMaxGoroutines(runtime.GOMAXPROCS(0)) for i, value := range values { i, value := i, value p.Go(func() error { - return worker(i, value) + return f(i, value) }) } return p.Wait() diff --git a/internal/uploadstore/s3_client.go b/internal/uploadstore/s3_client.go index 073ab224b1d5..0d358617650f 100644 --- a/internal/uploadstore/s3_client.go +++ b/internal/uploadstore/s3_client.go @@ -200,7 +200,7 @@ func (s *s3Store) Compose(ctx context.Context, destination string, sources ...st var m sync.Mutex etags := map[int]*string{} - if err := RunWorkersOverStrings(sources, func(index int, source string) error { + if err := ForEachString(sources, func(index int, source string) error { partNumber := index + 1 copyResult, err := s.client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ @@ -331,7 +331,7 @@ func (s *s3Store) create(ctx context.Context) error { } func (s *s3Store) deleteSources(ctx context.Context, bucket string, sources []string) error { - return RunWorkersOverStrings(sources, func(index int, source string) error { + return ForEachString(sources, func(index int, source string) error { if _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(bucket), Key: aws.String(source),