-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add workerStop handler, graceful exits #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,6 @@ | ||
| { | ||
| "preset": "psr12" | ||
| "preset": "psr12", | ||
| "rules": { | ||
| "single_quote": true | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int | |
| return $this->delegatePublish(__FUNCTION__, \func_get_args()); | ||
| } | ||
|
|
||
| public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void | ||
| { | ||
| public function consume( | ||
| Queue $queue, | ||
| callable $messageCallback, | ||
| callable $successCallback, | ||
| callable $errorCallback, | ||
| ): void { | ||
| $this->delegateConsumer(__FUNCTION__, \func_get_args()); | ||
| } | ||
|
|
||
| public function close(): void | ||
| { | ||
| $this->delegateConsumer(__FUNCTION__, \func_get_args()); | ||
| // TODO: Implement closing all connections in the pool | ||
| } | ||
|
Comment on lines
42
to
45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete The 🔎 Suggested implementation public function close(): void
{
- // TODO: Implement closing all connections in the pool
+ // Close all connections in the publisher pool
+ $this->publisher?->reclaim();
+
+ // Close all connections in the consumer pool
+ $this->consumer?->reclaim();
}Note: The exact implementation depends on the
🤖 Prompt for AI Agents |
||
|
|
||
| protected function delegatePublish(string $method, array $args): mixed | ||
| { | ||
| return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) { | ||
| return $this->publisher?->use(function (Publisher $adapter) use ( | ||
| $method, | ||
| $args, | ||
| ) { | ||
| return $adapter->$method(...$args); | ||
| }); | ||
| } | ||
|
|
||
| protected function delegateConsumer(string $method, array $args): mixed | ||
| { | ||
| return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) { | ||
| return $this->consumer?->use(function (Consumer $adapter) use ( | ||
| $method, | ||
| $args, | ||
| ) { | ||
| return $adapter->$method(...$args); | ||
| }); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Worker exits immediately without processing jobs.
The worker process invokes
onWorkerStopcallbacks immediately afteronWorkerStart(lines 64-66), causing the worker to exit before the consume loop can run. TheServerwires the consumer loop viaworkerStart, but the coroutine exits before that loop ever blocks.Expected flow:
onWorkerStartcallbacks execute (including Server's consume loop)consumer->close()breaks the looponWorkerStopcallbacks executeFix: Remove lines 64-66. The
onWorkerStopcallbacks should be invoked after the worker's main work completes (e.g., in adeferor after the consume loop returns), not immediately after start.🔎 Proposed fix
protected function spawnWorker(int $workerId): void { $process = new Process(function () use ($workerId) { Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); Coroutine\run(function () use ($workerId) { Process::signal(SIGTERM, fn () => $this->consumer->close()); + // Register deferred cleanup + defer(function () use ($workerId) { + foreach ($this->onWorkerStop as $callback) { + $callback((string)$workerId); + } + }); + foreach ($this->onWorkerStart as $callback) { $callback((string)$workerId); } - - foreach ($this->onWorkerStop as $callback) { - $callback((string)$workerId); - } }); }, false, 0, false); $pid = $process->start(); $this->workers[$pid] = $process; }🤖 Prompt for AI Agents