diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index f51a711f..c3a7fb18 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -74,16 +74,21 @@ jobs: - name: "Run WASM Tests" # There's a whole bunch of vars in the environment that aren't needed for running this test, so we clear them out. # NOTE: if you need to do this in the future, I recommend bashing into the container and running `env | sort | less` - run: |- - GOOS=js \ - GOARCH=wasm \ - cleanenv \ - -remove-prefix GITHUB_ \ - -remove-prefix ANDROID_ \ - -remove-prefix JAVA_ \ - -remove-prefix DOTNET_ \ - -remove-prefix RUNNER_ \ - -remove-prefix HOMEBREW_ \ - -remove-prefix runner_ \ - -- \ - go test ./pkg/wasm/... -exec $(go env GOPATH)/bin/wasmbrowsertest + # NOTE: wasmbrowsertest can have flaky websocket timeouts, so we retry up to 3 times + uses: "nick-fields/retry@v3" + with: + max_attempts: 3 + timeout_minutes: 5 + command: |- + GOOS=js \ + GOARCH=wasm \ + cleanenv \ + -remove-prefix GITHUB_ \ + -remove-prefix ANDROID_ \ + -remove-prefix JAVA_ \ + -remove-prefix DOTNET_ \ + -remove-prefix RUNNER_ \ + -remove-prefix HOMEBREW_ \ + -remove-prefix runner_ \ + -- \ + go test ./pkg/wasm/... -exec $(go env GOPATH)/bin/wasmbrowsertest diff --git a/internal/grpcutil/batch.go b/internal/grpcutil/batch.go index 95c0395c..dfcbc324 100644 --- a/internal/grpcutil/batch.go +++ b/internal/grpcutil/batch.go @@ -4,6 +4,7 @@ import ( "context" "errors" "runtime" + "sync/atomic" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -47,6 +48,7 @@ func ConcurrentBatch(ctx context.Context, n int, batchSize int, maxWorkers int, maxWorkers = runtime.GOMAXPROCS(0) } + var failed atomic.Bool sem := semaphore.NewWeighted(int64(maxWorkers)) g, ctx := errgroup.WithContext(ctx) numBatches := (n + batchSize - 1) / batchSize @@ -55,12 +57,24 @@ func ConcurrentBatch(ctx context.Context, n int, batchSize int, maxWorkers int, break } + // After acquiring the semaphore, check whether a previous batch + // has already failed. This handles the race where a failing batch + // releases the semaphore before the errgroup cancels the context. + if failed.Load() { + sem.Release(1) + break + } + batchNum := i g.Go(func() error { defer sem.Release(1) start := batchNum * batchSize end := minimum(start+batchSize, n) - return each(ctx, batchNum, start, end) + err := each(ctx, batchNum, start, end) + if err != nil { + failed.Store(true) + } + return err }) } return g.Wait() diff --git a/internal/mcp/mcp.go b/internal/mcp/mcp.go index aef310ab..d169e51e 100644 --- a/internal/mcp/mcp.go +++ b/internal/mcp/mcp.go @@ -1129,6 +1129,8 @@ func newSpiceDBServer(ctx context.Context) (server.RunnableServer, error) { server.WithNamespaceCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), server.WithClusterDispatchCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), server.WithDatastore(ds), + // enable expiration support for relationships + server.WithEnableRelationshipExpiration(true), } return server.NewConfigWithOptionsAndDefaults(configOpts...).Complete(ctx)