Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions fn/goroutine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ package fn

import (
"context"
"errors"
"sync"
)

// ErrStopping is returned when trying to add a new goroutine while stopping.
var ErrStopping = errors.New("can not add goroutine, stopping")

// GoroutineManager is used to launch goroutines until context expires or the
// manager is stopped. The Stop method blocks until all started goroutines stop.
type GoroutineManager struct {
Expand All @@ -29,8 +25,10 @@ func NewGoroutineManager(ctx context.Context) *GoroutineManager {
}
}

// Go starts a new goroutine if the manager is not stopping.
func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
// Go tries to start a new goroutine and returns a boolean indicating its
// success. It fails iff the goroutine manager is stopping or its context passed
// to NewGoroutineManager has expired.
func (g *GoroutineManager) Go(f func(ctx context.Context)) bool {
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
// condition, since it is not clear should Wait() block or not. This
// kind of race condition is detected by Go runtime and results in a
Expand All @@ -43,7 +41,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
defer g.mu.Unlock()

if g.ctx.Err() != nil {
return ErrStopping
return false
}

g.wg.Add(1)
Expand All @@ -52,7 +50,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
f(g.ctx)
}()

return nil
return true
}

// Stop prevents new goroutines from being added and waits for all running
Expand All @@ -66,7 +64,7 @@ func (g *GoroutineManager) Stop() {
// safe, since it can't run in parallel with wg.Add(1) call in Go, since
// we just cancelled the context and even if Go call starts running here
// after acquiring the mutex, it would see that the context has expired
// and return ErrStopping instead of calling wg.Add(1).
// and return false instead of calling wg.Add(1).
g.wg.Wait()
}

Expand Down
48 changes: 42 additions & 6 deletions fn/goroutine_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fn

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -19,7 +20,7 @@ func TestGoroutineManager(t *testing.T) {

taskChan := make(chan struct{})

require.NoError(t, m.Go(func(ctx context.Context) {
require.True(t, m.Go(func(ctx context.Context) {
<-taskChan
}))

Expand All @@ -37,7 +38,7 @@ func TestGoroutineManager(t *testing.T) {
require.Greater(t, stopDelay, time.Second)

// Make sure new goroutines do not start after Stop.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
require.False(t, m.Go(func(ctx context.Context) {}))

// When Stop() is called, the internal context expires and m.Done() is
// closed. Test this.
Expand All @@ -56,7 +57,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {

m := NewGoroutineManager(ctx)

require.NoError(t, m.Go(func(ctx context.Context) {
require.True(t, m.Go(func(ctx context.Context) {
<-ctx.Done()
}))

Expand All @@ -79,7 +80,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {
}

// Make sure new goroutines do not start after context expiry.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
require.False(t, m.Go(func(ctx context.Context) {}))

// Stop will wait for all goroutines to stop.
m.Stop()
Expand Down Expand Up @@ -107,15 +108,50 @@ func TestGoroutineManagerStress(t *testing.T) {
// implementation, this test crashes under `-race`.
for i := 0; i < 100; i++ {
taskChan := make(chan struct{})
err := m.Go(func(ctx context.Context) {
ok := m.Go(func(ctx context.Context) {
close(taskChan)
})
// If goroutine was started, wait for its completion.
if err == nil {
if ok {
<-taskChan
}
}

// Wait for Stop to complete.
<-stopChan
}

// TestGoroutineManagerStopsStress launches many Stop() calls in parallel with a
// task exiting. It attempts to catch a race condition between wg.Done() and
// wg.Wait() calls. According to documentation of wg.Wait() this is acceptable,
// therefore this test passes even with -race.
Comment on lines +124 to +127
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the implementation of WaitGroups I think we don't need this Stresstest anymore or ? There is no problem in calling wg.Done()and potentially reaching a counter of 0. Only adding new Goroutines are a problem and cause potential race conditions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it as a safeguard in case we change the implementation of GoroutineManager in the future, and the new implementation is not resilient to this scenario.

func TestGoroutineManagerStopsStress(t *testing.T) {
t.Parallel()

m := NewGoroutineManager(context.Background())

// jobChan is used to make the task to finish.
jobChan := make(chan struct{})

// Start a task and wait inside it until we start calling Stop() method.
ok := m.Go(func(ctx context.Context) {
<-jobChan
})
require.True(t, ok)

// Now launch many gorotines calling Stop() method in parallel.
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m.Stop()
}()
}

// Exit the task in parallel with Stop() calls.
close(jobChan)

// Wait until all the Stop() calls complete.
wg.Wait()
}