-
Notifications
You must be signed in to change notification settings - Fork 2.2k
fn: Remove ctx from GoroutineManager constructor #9341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,51 +3,123 @@ package fn | |
| import ( | ||
| "context" | ||
| "sync" | ||
| "sync/atomic" | ||
| ) | ||
|
|
||
| // GoroutineManager is used to launch goroutines until context expires or the | ||
| // manager is stopped. The Stop method blocks until all started goroutines stop. | ||
| type GoroutineManager struct { | ||
| wg sync.WaitGroup | ||
| mu sync.Mutex | ||
| ctx context.Context | ||
| cancel func() | ||
| // id is used to generate unique ids for each goroutine. | ||
| id atomic.Uint32 | ||
|
|
||
| // cancelFns is a map of cancel functions that can be used to cancel the | ||
| // context of a goroutine. The mutex must be held when accessing this | ||
| // map. The key is the id of the goroutine. | ||
| cancelFns map[uint32]context.CancelFunc | ||
|
|
||
| mu sync.Mutex | ||
|
|
||
| stopped sync.Once | ||
| quit chan struct{} | ||
| wg sync.WaitGroup | ||
| } | ||
|
|
||
| // NewGoroutineManager constructs and returns a new instance of | ||
| // GoroutineManager. | ||
| func NewGoroutineManager(ctx context.Context) *GoroutineManager { | ||
| ctx, cancel := context.WithCancel(ctx) | ||
|
|
||
| func NewGoroutineManager() *GoroutineManager { | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return &GoroutineManager{ | ||
| ctx: ctx, | ||
| cancel: cancel, | ||
| cancelFns: make(map[uint32]context.CancelFunc), | ||
| quit: make(chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| // addCancelFn adds a context cancel function to the manager and returns an id | ||
| // that can can be used to cancel the context later on when the goroutine is | ||
| // done. | ||
| func (g *GoroutineManager) addCancelFn(cancel context.CancelFunc) uint32 { | ||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
|
|
||
| id := g.id.Add(1) | ||
| g.cancelFns[id] = cancel | ||
|
|
||
| return id | ||
| } | ||
|
|
||
| // cancel cancels the context associated with the passed id. | ||
| func (g *GoroutineManager) cancel(id uint32) { | ||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
|
|
||
| g.cancelUnsafe(id) | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // cancelUnsafe cancels the context associated with the passed id without | ||
| // acquiring the mutex. | ||
| func (g *GoroutineManager) cancelUnsafe(id uint32) { | ||
| fn, ok := g.cancelFns[id] | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| fn() | ||
|
|
||
| delete(g.cancelFns, id) | ||
| } | ||
|
|
||
| // Go tries to start a new goroutine and returns a boolean indicating its | ||
| // success. It fails iff the goroutine manager is stopping or its context passed | ||
| // to NewGoroutineManager has expired. | ||
| func (g *GoroutineManager) Go(f func(ctx context.Context)) bool { | ||
| // Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race | ||
| // condition, since it is not clear should Wait() block or not. This | ||
| // success. It returns true if the goroutine was successfully created and false | ||
| // otherwise. A goroutine will fail to be created iff the goroutine manager is | ||
| // stopping or the passed context has already expired. The passed call-back | ||
| // function must exit if the passed context expires. | ||
| func (g *GoroutineManager) Go(ctx context.Context, | ||
| f func(ctx context.Context)) bool { | ||
|
|
||
| // Derive a cancellable context from the passed context and store its | ||
| // cancel function in the manager. The context will be cancelled when | ||
| // either the parent context is cancelled or the quit channel is closed | ||
| // which will call the stored cancel function. | ||
| ctx, cancel := context.WithCancel(ctx) | ||
| id := g.addCancelFn(cancel) | ||
|
|
||
| // Calling wg.Add(1) and wg.Wait() when the wg's counter is 0 is a race | ||
| // condition, since it is not clear if Wait() should block or not. This | ||
| // kind of race condition is detected by Go runtime and results in a | ||
| // crash if running with `-race`. To prevent this, whole Go method is | ||
| // protected with a mutex. The call to wg.Wait() inside Stop() can still | ||
| // run in parallel with Go, but in that case g.ctx is in expired state, | ||
| // because cancel() was called in Stop, so Go returns before wg.Add(1) | ||
| // call. | ||
| // crash if running with `-race`. To prevent this, we protect the calls | ||
| // to wg.Add(1) and wg.Wait() with a mutex. If we block here because | ||
| // Stop is running first, then Stop will close the quit channel which | ||
| // will cause the context to be cancelled, and we will exit before | ||
| // calling wg.Add(1). If we grab the mutex here before Stop does, then | ||
| // Stop will block until after we call wg.Add(1). | ||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
|
|
||
| if g.ctx.Err() != nil { | ||
| // Before continuing to start the goroutine, we need to check if the | ||
| // context has already expired. This could be the case if the parent | ||
| // context has already expired or if Stop has been called. | ||
| if ctx.Err() != nil { | ||
| g.cancelUnsafe(id) | ||
|
|
||
| return false | ||
| } | ||
|
|
||
| // Ensure that the goroutine is not started if the manager has stopped. | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| select { | ||
| case <-g.quit: | ||
| g.cancelUnsafe(id) | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return false | ||
| default: | ||
| } | ||
|
|
||
| g.wg.Add(1) | ||
| go func() { | ||
| defer g.wg.Done() | ||
| f(g.ctx) | ||
| defer func() { | ||
| g.cancel(id) | ||
| g.wg.Done() | ||
| }() | ||
|
|
||
| f(ctx) | ||
| }() | ||
|
|
||
| return true | ||
|
|
@@ -56,20 +128,30 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) bool { | |
| // Stop prevents new goroutines from being added and waits for all running | ||
| // goroutines to finish. | ||
| func (g *GoroutineManager) Stop() { | ||
| g.mu.Lock() | ||
| g.cancel() | ||
| g.mu.Unlock() | ||
|
|
||
| // Wait for all goroutines to finish. Note that this wg.Wait() call is | ||
| // safe, since it can't run in parallel with wg.Add(1) call in Go, since | ||
| // we just cancelled the context and even if Go call starts running here | ||
| // after acquiring the mutex, it would see that the context has expired | ||
| // and return false instead of calling wg.Add(1). | ||
| g.wg.Wait() | ||
| g.stopped.Do(func() { | ||
| // Closing the quit channel will prevent any new goroutines from | ||
| // starting. | ||
| g.mu.Lock() | ||
| close(g.quit) | ||
| for _, cancel := range g.cancelFns { | ||
| cancel() | ||
| } | ||
|
Comment on lines
+136
to
+138
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I propose to clear the map here, to free memory.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can do, although i dont think it is necessary since this is not a restartable system. This will only be called when the system is shutting down right? if we set the map to nil here, then you could argue that in every Stop method we have in LND, we should go and set each object to nil. Interested to hear what a second reviewer thinks too. But yeah, can defs add a |
||
| g.mu.Unlock() | ||
|
|
||
| // Wait for all goroutines to finish. Note that this wg.Wait() | ||
| // call is safe, since it can't run in parallel with wg.Add(1) | ||
| // call in Go, since we just cancelled the context and even if | ||
| // Go call starts running here after acquiring the mutex, it | ||
| // would see that the context has expired and return false | ||
| // instead of calling wg.Add(1). | ||
| g.wg.Wait() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This update changes the behavior of Stop(). Previously, if one goroutine called Stop() and another goroutine invoked Stop() concurrently, the second call would block, waiting for the first call to complete. Now, the second Stop() call returns immediately.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is that a problem? is there a usecase for needing to support the blocking behaviour? afaiu, this will mostly be used within other sub-systems and Stop will be called by their Stop methods which typically will also only be called once
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the previous version
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just dont think more than 1 system will ever own the GoroutineManager right? ie, what makes this Stop different from other Stop methods of other subsystems in LND which use a similar pattern to this?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think, the intuition of Stop() method is that after Stop() call (successfully) returning, all the workers are down. If the second Stop() just returns immediately, this intuition is broken. Can we panic or return error if a second Stop is detected?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i see there is a Done() method - we could always update this to return a dedicated
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bhandras - perhaps a 3rd opinion here just to break the tie would help? 🙏
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Option 3 sgtm! (Done with shutdown chan)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cool - pushed a diff with option 3 included 🫡
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah - just realised we cannot do this since callers may wait on Done() inside a goroutine that was started within Go(). Undoing this change |
||
| }) | ||
| } | ||
|
|
||
| // Done returns a channel which is closed when either the context passed to | ||
| // NewGoroutineManager expires or when Stop is called. | ||
| // Done returns a channel which is closed once Stop has been called and the | ||
| // quit channel closed. Note that the channel closing indicates that shutdown | ||
| // of the GoroutineManager has started but not necessarily that the Stop method | ||
| // has finished. | ||
| func (g *GoroutineManager) Done() <-chan struct{} { | ||
| return g.ctx.Done() | ||
| return g.quit | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.