-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add workerStop handler, graceful exits #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This comment was marked as spam.
This comment was marked as spam.
a8a21e0 to
f51092a
Compare
f51092a to
6a7d673
Compare
6a7d673 to
e078822
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Graceful shutdown and lifecycle hooks for queue workers across adapters, plus test container updates for signal support.
- Add per-worker stop hooks and worker stop event handling.
- Introduce signal-based graceful shutdown for Swoole and ensure AMQP consumers stop before closing.
- Standardize test container volumes and enable pcntl in images.
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/Queue/servers/Workerman/worker.php | Adds workerStop hook usage for test worker lifecycle. |
| tests/Queue/servers/SwooleRedisCluster/worker.php | Adds workerStop hook usage for cluster worker lifecycle. |
| tests/Queue/servers/Swoole/worker.php | Adds workerStop hook usage for Swoole worker lifecycle. |
| tests/Queue/servers/AMQP/worker.php | Adds workerStop hook usage for AMQP worker lifecycle. |
| tests/Queue/servers/SwooleRedisCluster/Dockerfile | Enables pcntl for proper signal handling in tests. |
| tests/Queue/servers/Swoole/Dockerfile | Enables pcntl for proper signal handling in tests. |
| tests/Queue/servers/AMQP/Dockerfile | Enables pcntl for proper signal handling in tests. |
| src/Queue/Server.php | Adds workerStop Hook API and integrates stop handling into worker lifecycle. |
| src/Queue/Broker/AMQP.php | Attempts to stop consumption before closing channel/connection. |
| src/Queue/Adapter/Swoole.php | Adds signal handlers and proper EVENT_* usage; introduces main-process stop path. |
| docker-compose.yml | Standardizes container code paths and volumes for tests. |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/Queue/Adapter/Swoole.php
Outdated
| /** @var callable */ | ||
| private $onStop; |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property $onStop is declared without a nullable type or default value. This should be declared as private ?callable $onStop = null; to be explicit about its nullable nature and avoid potential issues.
| /** @var callable */ | |
| private $onStop; | |
| private ?callable $onStop = null; |
6efa651 to
8dba579
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (5)
src/Queue/Adapter/Swoole.php (4)
93-104: Mirror the parameter type fixes inworkerStop().Apply the same parameter corrections as in
workerStart(): rename$poolto$_pooland type$workerIdasint.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $_pool, + int $workerId, + ) use ($callback) { \call_user_func($callback, $workerId); });
54-64: Remove the directonStopinvocation fromstop().Calling
$this->onStop()without a$workerIdparameter will cause anArgumentCountError, as the callback registered viaworkerStop()expects a$workerIdargument (see line 100 andServer.phpline 377). TheEVENT_WORKER_STOPhandler already invokes the callback correctly for each worker.Apply this diff:
public function stop(): self { - if ($this->onStop) { - \call_user_func($this->onStop); - } - Console::info("[Swoole] Shutting down process pool..."); $this->pool->shutdown(); Console::success("[Swoole] Process pool stopped."); return $this; }
66-91: Fix parameter types and silence PHPMD warning.Two issues with the event handler signature:
- The
$poolparameter is unused and should be renamed to$_poolto silence PHPMD.- Swoole passes
$workerIdasint, notstring.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $_pool, + int $workerId, + ) use ($callback) { // Register signal handlers in worker Process::signal(SIGTERM, function () {
16-17: Type the$onStopproperty explicitly.The property should use a typed declaration instead of a docblock to prevent accidental non-callable assignments and clarify intent.
Apply this diff:
- /** @var callable */ - private $onStop; + private ?callable $onStop = null;src/Queue/Server.php (1)
106-110: Guard against undefined index when accessing resource callbacks.Line 109 accesses
self::$resourcesCallbacks[$name]["reset"]before line 111 checks if$nameexists in the array. This can trigger an undefined index warning.Apply this diff to reorder the checks:
if ( !\array_key_exists($name, $this->resources) || $fresh || - self::$resourcesCallbacks[$name]["reset"] + (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]["reset"]) ) { if (!\array_key_exists($name, self::$resourcesCallbacks)) {
🧹 Nitpick comments (1)
src/Queue/Server.php (1)
428-443: LGTM!The
workerStop()andgetWorkerStop()methods mirror theworkerStart()pattern correctly, enabling registration and retrieval of multiple stop hooks.Optional: The docblock on line 437 could be refined to "Returns worker stop hooks." for better grammar, but this is a minor nitpick.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
docker-compose.yml(5 hunks)src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/AMQP.php(1 hunks)src/Queue/Broker/Pool.php(2 hunks)src/Queue/Server.php(11 hunks)tests/Queue/servers/AMQP/Dockerfile(1 hunks)tests/Queue/servers/AMQP/worker.php(1 hunks)tests/Queue/servers/Swoole/Dockerfile(1 hunks)tests/Queue/servers/Swoole/worker.php(1 hunks)tests/Queue/servers/SwooleRedisCluster/Dockerfile(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(1 hunks)tests/Queue/servers/Workerman/worker.php(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (5)
- tests/Queue/servers/SwooleRedisCluster/Dockerfile
- tests/Queue/servers/Swoole/Dockerfile
- src/Queue/Broker/AMQP.php
- tests/Queue/servers/Workerman/worker.php
- tests/Queue/servers/AMQP/worker.php
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Adapter.php (6)
Adapter(5-49)__construct(12-16)stop(28-28)start(22-22)workerStart(35-35)workerStop(42-42)src/Queue/Server.php (5)
__construct(84-88)stop(236-247)start(266-401)workerStart(407-413)workerStop(428-434)
tests/Queue/servers/Swoole/worker.php (5)
src/Queue/Server.php (4)
Server(14-530)job(90-94)workerStart(407-413)workerStop(428-434)src/Queue/Adapter.php (3)
Adapter(5-49)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Swoole.php (3)
Swoole(12-110)workerStart(66-91)workerStop(93-104)src/Queue/Connection/Redis.php (1)
Redis(7-184)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
69-69: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
97-97: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (14)
tests/Queue/servers/AMQP/Dockerfile (1)
5-5: LGTM!Installing the PCNTL extension is necessary for the signal-handling features introduced in this PR.
docker-compose.yml (1)
6-8: LGTM!Standardizing volume mount paths to
/usr/src/codeacross all services improves consistency and maintainability.Also applies to: 20-22, 31-33, 43-45, 55-57
tests/Queue/servers/Swoole/worker.php (3)
3-9: LGTM!The updated require paths and explicit imports improve code clarity and align with the refactored queue infrastructure.
11-26: LGTM!The refactored server setup is clean and follows the new chained API pattern. The use of
handleRequest(...)with the spread operator is appropriate.
28-30: LGTM!The new
workerStop()hook correctly registers a shutdown action, mirroring theworkerStart()pattern. This enables graceful cleanup when workers terminate.tests/Queue/servers/SwooleRedisCluster/worker.php (2)
3-21: LGTM!The refactored setup correctly instantiates a Redis cluster-backed consumer and follows the same clean API pattern as the other test workers.
30-36: LGTM!Both
workerStart()andworkerStop()hooks are correctly implemented and consistent with the other test worker implementations.src/Queue/Adapter/Swoole.php (1)
31-52: LGTM!The signal handling using
Process::signalfor graceful shutdown is appropriate for Swoole. The implementation correctly registers handlers for SIGTERM and SIGINT in the master process.src/Queue/Server.php (6)
33-63: LGTM!The transition from single hooks to arrays (
workerStartHooks,workerStopHooks) enables registration of multiple lifecycle hooks, improving extensibility.
156-166: LGTM!The formatting improvements and explicit string keys for the resource callback array enhance code clarity.
269-275: LGTM!The
workerStartcallback correctly iterates over all registered hooks and invokes them with the proper arguments. The typed$workerIdparameter is appropriate.
277-374: LGTM!The consumer callback correctly handles the job lifecycle with init hooks, job execution, shutdown hooks, and error handling. The telemetry recording for wait time and process duration is appropriate.
377-391: LGTM!The
workerStopcallback correctly iterates over registered stop hooks and closes the consumer connection gracefully. This is a key addition for enabling clean worker shutdown.
407-422: LGTM!The
workerStart()method correctly creates and registers hooks in the array, andgetWorkerStart()provides access to the registered hooks. The pattern enables multiple start hooks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/Queue/Adapter/Swoole.php
Outdated
| { | ||
| $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { | ||
| call_user_func($callback, $workerId); | ||
| $this->onStop = $callback; |
Copilot
AI
Dec 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workerStop callback is being stored in both the $onStop property and registered with the pool's EVENT_WORKER_STOP event. However, in the stop() method, $onStop is called which will invoke the callback with no arguments, while the EVENT_WORKER_STOP expects the callback to receive $workerId. This creates inconsistent invocation - the callback will be called once without arguments in stop(), and once with $workerId from the pool event. Consider either removing line 95 or restructuring to avoid double invocation with different signatures.
src/Queue/Adapter/Swoole.php
Outdated
| // Register signal handlers in worker | ||
| Process::signal(SIGTERM, function () { | ||
| Console::info( | ||
| "[Swoole] Received SIGTERM, initiating graceful shutdown...", | ||
| ); | ||
| $this->stop(); | ||
| }); | ||
|
|
||
| Process::signal(SIGINT, function () { | ||
| Console::info( | ||
| "[Swoole] Received SIGINT, initiating graceful shutdown...", | ||
| ); | ||
| $this->stop(); | ||
| }); | ||
|
|
Copilot
AI
Dec 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signal handlers registered within the workerStart callback will override the signal handlers registered in the start() method (lines 36-48). In Swoole, signal handlers registered in worker processes override those in the master process. This means the master process signals won't work as expected. Consider removing the duplicate signal registration in workerStart since the master process should handle shutdown coordination.
| // Register signal handlers in worker | |
| Process::signal(SIGTERM, function () { | |
| Console::info( | |
| "[Swoole] Received SIGTERM, initiating graceful shutdown...", | |
| ); | |
| $this->stop(); | |
| }); | |
| Process::signal(SIGINT, function () { | |
| Console::info( | |
| "[Swoole] Received SIGINT, initiating graceful shutdown...", | |
| ); | |
| $this->stop(); | |
| }); | |
| // Worker initialization logic |
8dba579 to
d62ecb0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Typo in queue name: "wokerman" should be "workerman".Apply this diff:
-$adapter = new Workerman($consumer, 12, "wokerman"); +$adapter = new Workerman($consumer, 12, "workerman");src/Queue/Adapter/Swoole.php (2)
61-64: Fix parameter types: rename unused$pooland type$workerIdas int.The
Poolparameter is unused (triggers PHPMD warning) and should be renamed to$_pool. Additionally, Swoole'sPoolevent callbacks pass$workerIdas anint, not astring.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {
88-91: Fix parameter types: rename unused$pooland type$workerIdas int.Same issue as in
workerStart: the$poolparameter is unused and should be prefixed with an underscore, and$workerIdshould be typed asintto match Swoole's signature.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {
🧹 Nitpick comments (2)
src/Queue/Server.php (2)
333-333: Fix misleading comment.The comment says "Global init hooks" but this code is in the shutdown (success) callback, so it should say "Global shutdown hooks".
Apply this diff:
- // Global init hooks + // Global shutdown hooks
346-346: Fix misleading comment.The comment says "Group init hooks" but this code is in the shutdown (success) callback, so it should say "Group shutdown hooks".
Apply this diff:
- // Group init hooks + // Group shutdown hooks
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/Pool.php(2 hunks)src/Queue/Server.php(11 hunks)tests/Queue/servers/AMQP/worker.php(1 hunks)tests/Queue/servers/Swoole/worker.php(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(1 hunks)tests/Queue/servers/Workerman/worker.php(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (3)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Broker/AMQP.php (1)
AMQP(20-226)src/Queue/Adapter/Swoole.php (3)
Swoole(12-102)workerStart(59-84)workerStop(86-96)src/Queue/Server.php (4)
Server(14-538)job(90-94)workerStart(415-421)workerStop(436-442)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
src/Queue/Server.php (4)
src/Queue/Job.php (1)
hook(24-29)src/Queue/Consumer.php (1)
consume(17-22)src/Queue/Message.php (4)
Message(5-81)getPid(52-55)getTimestamp(62-65)getPayload(67-70)src/Queue/Adapter/Workerman.php (1)
workerStop(42-49)
tests/Queue/servers/Workerman/worker.php (5)
src/Queue/Adapter/Workerman.php (3)
Workerman(9-55)workerStart(34-41)workerStop(42-49)src/Queue/Broker/Redis.php (1)
Redis(11-167)src/Queue/Connection/Redis.php (1)
Redis(7-184)src/Queue/Server.php (5)
Server(14-538)job(90-94)error(531-537)workerStart(415-421)workerStop(436-442)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
62-62: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
89-89: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Adapter Tests (Workerman)
- GitHub Check: Adapter Tests (SwooleRedisCluster)
92916aa to
957765c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Typo in queue name: "wokerman" should be "workerman".src/Queue/Server.php (1)
332-346: Fix misleading comments in shutdown callback.The comments say "init hooks" but this code is in the shutdown callback context.
foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array("*", $hook->getGroups())) {foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (4)
src/Queue/Broker/Pool.php (1)
42-45: Incompleteclose()implementation may lead to resource leaks.The Pool's
close()method is a no-op with a TODO. WhenPool::close()is called (e.g., during graceful shutdown), the underlying connections in the pool won't be closed, potentially causing resource leaks.Do you want me to open an issue to track implementing proper connection pool cleanup?
src/Queue/Adapter/Swoole.php (2)
50-53: Rename unused$poolparameter to suppress PHPMD warning.The
$poolparameter is required by Swoole's callback signature but isn't used. Prefix it with underscore to signal intentional non-use.$this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, + Pool $_pool, string $workerId, ) use ($callback) {
81-84: Same fix: rename unused$poolto$_pool.$this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, + Pool $_pool, string $workerId, ) use ($callback) {src/Queue/Server.php (1)
423-451: Consider adding specific return type hints in docblocks.The methods return arrays of Hook objects. The docblocks could be more specific for IDE support:
/** * Returns Worker starts hooks. - * @return array + * @return array<Hook> */ public function getWorkerStart(): array/** * Returns Worker stops hooks. - * @return array + * @return array<Hook> */ public function getWorkerStop(): array
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/Pool.php(1 hunks)src/Queue/Broker/Redis.php(1 hunks)src/Queue/Connection.php(1 hunks)src/Queue/Connection/Redis.php(1 hunks)src/Queue/Connection/RedisCluster.php(1 hunks)src/Queue/Server.php(11 hunks)tests/Queue/servers/AMQP/worker.php(1 hunks)tests/Queue/servers/Swoole/worker.php(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(1 hunks)tests/Queue/servers/Workerman/worker.php(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection/Redis.php (4)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)src/Queue/Broker/AMQP.php (1)
close(132-136)
src/Queue/Connection.php (4)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Pool.php (5)
src/Queue/Broker/Redis.php (2)
consume(19-90)close(92-96)src/Queue/Queue.php (1)
Queue(5-15)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)src/Queue/Consumer.php (1)
close(27-27)src/Queue/Broker/AMQP.php (1)
close(132-136)
tests/Queue/servers/Swoole/worker.php (3)
src/Queue/Adapter/Swoole.php (3)
Swoole(15-95)workerStart(48-77)workerStop(79-89)src/Queue/Broker/Redis.php (1)
Redis(11-168)src/Queue/Connection/Redis.php (1)
Redis(7-190)
src/Queue/Adapter/Swoole.php (1)
src/Queue/Server.php (5)
start(266-409)stop(236-247)shutdown(224-230)workerStart(415-421)workerStop(436-442)
tests/Queue/servers/Workerman/worker.php (3)
src/Queue/Adapter.php (3)
Adapter(5-49)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Workerman.php (3)
Workerman(9-55)workerStart(34-41)workerStop(42-49)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
51-51: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
82-82: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (16)
tests/Queue/servers/Workerman/worker.php (1)
24-30: LGTM!The workerStart and workerStop hooks are correctly implemented and align with the new Server API for lifecycle management.
src/Queue/Broker/Redis.php (1)
92-96: LGTM!The close() method correctly sets the closed flag first to stop the consume loop, then closes the underlying connection. This ensures graceful shutdown without resource leaks.
src/Queue/Connection.php (1)
28-28: LGTM!Adding
close()to the Connection interface establishes a consistent cleanup contract. BothRedisandRedisClusterimplementations correctly implement this method.src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM!The implementation correctly uses the null-safe operator and nullifies the reference after closing, which is consistent with the Redis connection implementation and allows proper garbage collection.
src/Queue/Connection/Redis.php (1)
172-176: LGTM!The close() implementation follows the same pattern as RedisCluster, providing consistent behavior across connection types.
src/Queue/Broker/Pool.php (1)
33-38: LGTM!The multi-line formatting for
consume()parameters and the delegation closures improves readability without changing behavior.Also applies to: 47-54, 57-64
tests/Queue/servers/SwooleRedisCluster/worker.php (2)
11-19: LGTM!The Redis broker construction with RedisCluster connection and Swoole adapter setup is correct and follows the pattern established in other test workers.
30-36: LGTM!The workerStart and workerStop hooks are correctly wired up, consistent with the new Server lifecycle API.
tests/Queue/servers/Swoole/worker.php (2)
11-13: LGTM!The Redis broker and Swoole adapter construction follows the correct pattern.
24-30: LGTM!The workerStart and workerStop lifecycle hooks are correctly implemented, matching the pattern in other test workers.
tests/Queue/servers/AMQP/worker.php (1)
1-31: LGTM!The test worker script cleanly demonstrates the new lifecycle hooks. The use of first-class callable syntax for the job handler and the addition of
workerStartandworkerStophooks are well-structured.src/Queue/Adapter/Swoole.php (2)
54-74: Signal handling and coroutine approach look solid.Using
Process::signalfor Swoole-native signal handling and wrapping the callback ingo()allows the event loop to process signals while the consume loop runs. Closing the consumer on signal receipt enables graceful shutdown.
40-46: LGTM!Good observability by logging shutdown progress.
src/Queue/Server.php (3)
51-63: Clean migration to array-based hook management.The shift from single hook properties to arrays (
workerStartHooks,workerStopHooks) enables multiple lifecycle hooks per event, which is more flexible and aligns with how other hooks (errorHooks,initHooks,shutdownHooks) are already managed.
377-399: Robust workerStop implementation with proper error isolation.Wrapping each hook in its own try-catch ensures one failing hook doesn't prevent others from running. The
finallyblock guarantees the consumer connection is closed regardless of hook execution outcomes—exactly the right pattern for graceful shutdown.
266-275: LGTM!The
workerStartflow correctly iterates over the hooks array and invokes each hook with proper argument resolution.
tests/Queue/servers/AMQP/worker.php
Outdated
| $consumer = new AMQP(host: "amqp", port: 5672, user: "amqp", password: "amqp"); | ||
| $adapter = new Swoole($consumer, 12, "amqp"); | ||
| $server = new Server($adapter); | ||
|
|
||
| $server->job() | ||
| ->inject('message') | ||
| ->action(function (Message $message) { | ||
| handleRequest($message); | ||
| }); | ||
| $server->job()->inject("message")->action(handleRequest(...)); | ||
|
|
||
| $server | ||
| ->error() | ||
| ->inject('error') | ||
| ->inject("error") | ||
| ->action(function ($th) { | ||
| echo $th->getMessage() . PHP_EOL; | ||
| }); | ||
|
|
||
| $server | ||
| ->workerStart() | ||
| ->action(function () { | ||
| echo "Worker Started" . PHP_EOL; | ||
| }); | ||
| $server->workerStart()->action(function () { | ||
| echo "Worker Started" . PHP_EOL; | ||
| }); | ||
|
|
||
| $server->workerStop()->action(function () { | ||
| echo "Worker Stopped" . PHP_EOL; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single quotes
tests/Queue/servers/AMQP/Dockerfile
Outdated
| RUN apk add autoconf build-base | ||
| RUN apk add autoconf build-base | ||
|
|
||
| RUN docker-php-ext-install pcntl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it? I can't see it being used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
required in older implementation, removing
957765c to
1939710
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Fix the typo in the queue name.The queue name is 'wokerman' but should be 'workerman'. This was previously flagged in a past review but remains unfixed.
Apply this diff:
-$adapter = new Workerman($consumer, 12, 'wokerman'); +$adapter = new Workerman($consumer, 12, 'workerman');src/Queue/Adapter/Swoole.php (1)
50-53: Fix$workerIdtype and silence PHPMD unused$poolwarning.Swoole's Pool events pass
int $workerId, notstring. The$poolparameter is unused and should be prefixed with_to indicate intentional non-use.- $this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {- $this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {Also applies to: 81-84
src/Queue/Server.php (2)
331-341: Fix misleading comment: "Global init hooks" → "Global shutdown hooks".This block is inside the shutdown callback, not init.
if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array('*', $hook->getGroups())) {
344-355: Fix misleading comment: "Group init hooks" → "Group shutdown hooks".Same issue — this is in the shutdown callback.
foreach ($this->job->getGroups() as $group) { foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (2)
src/Queue/Broker/Pool.php (1)
42-45: Implement pool connection cleanup for graceful shutdown.The TODO indicates that closing pooled connections is not yet implemented. Given this PR's focus on graceful exits and worker-stop handlers, this gap means pooled publisher and consumer connections won't be properly released during shutdown, potentially causing resource leaks.
Would you like me to implement the pool cleanup logic? It should iterate through active connections in both publisher and consumer pools and call their
close()methods.src/Queue/Server.php (1)
427-430: Consider addingarray<Hook>return type hints for getters.The docblocks specify
@return array, but the typed return could be more specific./** * Returns Worker starts hooks. - * @return array + * @return array<Hook> */ public function getWorkerStart(): array /** * Returns Worker stops hooks. - * @return array + * @return array<Hook> */ public function getWorkerStop(): arrayAlso applies to: 448-451
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
pint.json(1 hunks)src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/AMQP.php(3 hunks)src/Queue/Broker/Pool.php(1 hunks)src/Queue/Broker/Redis.php(1 hunks)src/Queue/Connection.php(1 hunks)src/Queue/Connection/Redis.php(1 hunks)src/Queue/Connection/RedisCluster.php(1 hunks)src/Queue/Queue.php(1 hunks)src/Queue/Server.php(10 hunks)tests/Queue/servers/AMQP/Dockerfile(1 hunks)tests/Queue/servers/AMQP/worker.php(2 hunks)tests/Queue/servers/Swoole/Dockerfile(1 hunks)tests/Queue/servers/Swoole/worker.php(2 hunks)tests/Queue/servers/SwooleRedisCluster/Dockerfile(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(2 hunks)tests/Queue/servers/Workerman/Dockerfile(1 hunks)tests/Queue/servers/Workerman/worker.php(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- tests/Queue/servers/Workerman/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (5)
- src/Queue/Connection/Redis.php
- tests/Queue/servers/Swoole/worker.php
- src/Queue/Connection.php
- tests/Queue/servers/Swoole/Dockerfile
- tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (5)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Server.php (5)
src/Queue/Consumer.php (2)
consume(17-22)close(27-27)src/Queue/Adapter.php (1)
workerStop(42-42)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Server.php (4)
start(266-409)stop(236-247)shutdown(224-230)workerStart(415-421)src/Queue/Consumer.php (1)
close(27-27)
tests/Queue/servers/Workerman/worker.php (2)
src/Queue/Adapter.php (3)
Adapter(5-49)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Workerman.php (3)
Workerman(9-55)workerStart(34-41)workerStop(42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
51-51: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
82-82: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (19)
src/Queue/Queue.php (1)
12-12: LGTM!The string quote change aligns with the new
single_quoterule in pint.json.tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)
5-5: LGTM!Formatting-only change (trailing newline).
pint.json (1)
2-5: LGTM!Adding the
single_quoterule enforces consistent string quoting across the codebase.src/Queue/Broker/Redis.php (1)
92-96: LGTM!Properly closes the underlying connection to ensure resource cleanup, consistent with the Connection interface's close() method.
tests/Queue/servers/AMQP/Dockerfile (1)
3-3: LGTM!Formatting-only change (trailing newline).
src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM!Properly implements the Connection interface's close() method, ensuring the underlying RedisCluster connection is closed and resources are released.
src/Queue/Broker/AMQP.php (2)
114-114: LGTM!String quote changes align with the new
single_quoterule.Also applies to: 165-165
132-136: The code is safe—stopConsume() exists in php-amqplib 3.7.0.AMQPChannel::stopConsume() is a documented public method designed to stop the consume() loop, and has been available since version 3.2.0. Using this method in the close() function will not result in a fatal error; it is the appropriate way to safely interrupt consumer loops before closing the connection.
tests/Queue/servers/Workerman/worker.php (1)
15-15: LGTM!The refactored API usage with
inject('message')->action(handleRequest(...))and the newworkerStart()andworkerStop()lifecycle hooks align with the updated Server API.Also applies to: 24-30
src/Queue/Adapter/Swoole.php (4)
5-13: LGTM on new imports.The additions of
Swoole\Constant,Swoole\Process,Utopia\Console, and thegocoroutine function are appropriate for the signal handling and coroutine-based execution introduced in this file.
40-46: LGTM on graceful shutdown logging.Adding console output before and after
shutdown()improves observability during pool termination.
54-67: LGTM on per-worker signal handling.Registering SIGTERM/SIGINT handlers that call
$this->consumer->close()is the correct approach for graceful worker shutdown. Workers close their own consumer and let the pool manage process lifecycle, addressing earlier feedback about not callingstop()from workers.
69-73: LGTM on coroutine wrapping.Wrapping the callback in
go()allows the Swoole event loop to process signals while the consume loop runs.src/Queue/Server.php (3)
51-63: LGTM on array-based hook properties.Converting
workerStartHookandworkerStopHookfrom single hooks to arrays (workerStartHooks,workerStopHooks) enables multiple hook registration, which is a sensible enhancement.
377-399: LGTM on workerStop lifecycle handling.The try/finally structure ensures the consumer connection is always closed even if a hook throws. Per-hook exception handling with logging is a good resilience pattern.
415-421: LGTM on hook registration methods.
workerStart()andworkerStop()now returnHookinstances that are appended to their respective arrays, aligning with the pattern used byinit(),shutdown(), anderror().Also applies to: 436-442
tests/Queue/servers/AMQP/worker.php (3)
6-12: LGTM on streamlined imports and instantiation.Direct class imports and simplified instantiation improve readability.
14-14: LGTM on first-class callable syntax.Using
handleRequest(...)is cleaner than wrapping in a closure.
23-29: LGTM on workerStart/workerStop hooks.This demonstrates the new array-based hook API with the fluent
->action()pattern.
1939710 to
1db31ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Typo in queue name: 'wokerman' should be 'workerman'.This was previously flagged and remains unaddressed.
src/Queue/Server.php (3)
106-120: Guard resource callback access before reading 'reset' key.Line 109 accesses
self::$resourcesCallbacks[$name]['reset']before verifying the key exists on line 111. This can trigger an undefined index error. The condition should check for key existence before accessing the['reset']value.🔎 Proposed fix
public function getResource(string $name, bool $fresh = false): mixed { if ( !\array_key_exists($name, $this->resources) || $fresh || - self::$resourcesCallbacks[$name]['reset'] + (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset']) ) { if (!\array_key_exists($name, self::$resourcesCallbacks)) { throw new Exception("Failed to find resource: $name"); }
331-342: Fix misleading comment: should be "shutdown hooks".Line 333 says "Global init hooks" but this code is inside the success callback, which runs shutdown hooks. The comment should read "Global shutdown hooks" for accuracy.
🔎 Proposed fix
function (Message $message) { if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array('*', $hook->getGroups())) {
344-355: Fix misleading comment: should be "shutdown hooks".Line 346 says "Group init hooks" but this code is inside the success callback, which runs shutdown hooks. The comment should read "Group shutdown hooks" for accuracy.
🔎 Proposed fix
foreach ($this->job->getGroups() as $group) { foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (1)
src/Queue/Server.php (1)
414-450: LGTM: Worker lifecycle API properly implemented.The workerStart/workerStop methods correctly return Hook instances for fluent registration, and the getter methods return the full arrays. The API is consistent and well-designed.
Note: Line 444 says "Returns Worker stops hooks" — consider "Returns worker stop hooks" for slightly better grammar, though this is purely cosmetic.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
pint.jsonsrc/Queue/Adapter.phpsrc/Queue/Adapter/Swoole.phpsrc/Queue/Adapter/Workerman.phpsrc/Queue/Broker/AMQP.phpsrc/Queue/Broker/Pool.phpsrc/Queue/Broker/Redis.phpsrc/Queue/Connection.phpsrc/Queue/Connection/Redis.phpsrc/Queue/Connection/RedisCluster.phpsrc/Queue/Queue.phpsrc/Queue/Server.phptests/Queue/servers/AMQP/Dockerfiletests/Queue/servers/AMQP/worker.phptests/Queue/servers/Swoole/Dockerfiletests/Queue/servers/Swoole/worker.phptests/Queue/servers/SwooleRedisCluster/Dockerfiletests/Queue/servers/SwooleRedisCluster/worker.phptests/Queue/servers/Workerman/Dockerfiletests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
- src/Queue/Adapter/Workerman.php
- src/Queue/Adapter.php
✅ Files skipped from review due to trivial changes (2)
- tests/Queue/servers/SwooleRedisCluster/Dockerfile
- tests/Queue/servers/Swoole/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (4)
- src/Queue/Broker/AMQP.php
- tests/Queue/servers/SwooleRedisCluster/worker.php
- pint.json
- tests/Queue/servers/Swoole/worker.php
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection/Redis.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(103-107)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Connection.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(103-107)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Pool.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Queue.php (1)
Queue(5-15)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Redis.php (3)
src/Queue/Connection.php (3)
rightPopArray(8-8)close(28-28)rightPop(13-13)src/Queue/Connection/Redis.php (3)
rightPopArray(63-72)close(172-176)rightPop(74-83)src/Queue/Connection/RedisCluster.php (3)
rightPopArray(57-66)close(169-173)rightPop(68-77)
tests/Queue/servers/Workerman/worker.php (2)
src/Queue/Adapter/Workerman.php (3)
Workerman(9-50)workerStart(34-41)workerStop(42-49)src/Queue/Connection/Redis.php (1)
Redis(7-190)
src/Queue/Connection/RedisCluster.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Redis.php (1)
close(103-107)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Server.php (6)
src/Queue/Job.php (2)
hook(24-29)getHook(36-39)src/Queue/Consumer.php (2)
consume(17-22)close(27-27)src/Queue/Adapter/Swoole.php (1)
workerStop(100-104)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Adapter.php (6)
Adapter(5-43)__construct(12-16)start(22-22)stop(28-28)workerStart(35-35)workerStop(42-42)src/Queue/Server.php (5)
__construct(84-88)start(266-408)stop(236-247)workerStart(414-420)workerStop(435-441)src/Queue/Consumer.php (1)
close(27-27)
🪛 GitHub Actions: Linter
src/Queue/Broker/Redis.php
[error] 1-1: PSR-12 braces style violation detected by Pint. Please fix braces in this file. Run 'vendor/bin/pint' to auto-fix if available.
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
88-88: Avoid unused local variables such as '$process'. (undefined)
(UnusedLocalVariable)
🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php
13-13: Class constants with native types are supported only on PHP 8.3 and later.
(classConstant.nativeTypeNotSupported)
🔇 Additional comments (30)
src/Queue/Queue.php (1)
11-13: LGTM!The quote style change is consistent and the validation logic remains correct.
tests/Queue/servers/AMQP/Dockerfile (1)
1-3: LGTM!The Dockerfile is correctly structured with required build dependencies for PHP extensions.
tests/Queue/servers/Workerman/Dockerfile (1)
1-11: LGTM!The Dockerfile properly configures the pcntl extension required for Workerman process control.
src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM!The
close()implementation correctly handles the nullable Redis instance and resets the state to allow potential reconnection throughgetRedis().src/Queue/Connection/Redis.php (1)
172-176: LGTM!Clean implementation of
close()that properly releases the Redis connection and resets state for potential reconnection.src/Queue/Broker/Redis.php (4)
27-36: Good error handling for graceful shutdown.The try/catch properly distinguishes between shutdown-induced exceptions (break the loop) and unexpected Redis failures (rethrow). This ensures the worker can exit cleanly when
close()is called while blocked onrightPopArray.
103-107: LGTM!Calling
$this->connection->close()after setting theclosedflag ensures proper resource cleanup and complements the graceful shutdown logic in the consume loop.
130-130: LGTM!Using the
POP_TIMEOUTconstant here maintains consistency with the consume method and eliminates the magic number.
13-14: No issue to address. The project's minimum PHP version is 8.3, which fully supports native types on class constants. The implementation is compatible with the declared requirements.src/Queue/Broker/Pool.php (2)
33-40: LGTM!The multiline formatting improves readability of the method signature.
47-65: LGTM!The reformatted closures with trailing commas follow modern PHP style conventions.
src/Queue/Connection.php (1)
28-28: Good addition for graceful shutdown support.Adding
close()to the interface enables consistent resource cleanup across all connection types. All known implementations (Redis, RedisCluster) properly implement this method. Note this is a breaking change for any external implementations of this interface.tests/Queue/servers/Workerman/worker.php (4)
7-9: LGTM: Import updates align with refactored API.The imports correctly reflect the new class structure with Redis connection, Redis broker, and Workerman adapter.
15-15: LGTM: Job handler updated to fluent API.The use of
inject('message')->action(handleRequest(...))is cleaner and consistent with the new Server API.
24-26: LGTM: Worker start hook properly registered.The inline format is clean and the lifecycle hook is correctly wired.
28-30: LGTM: Worker stop hook added for lifecycle management.This properly demonstrates the new workerStop API and aligns with the broader worker lifecycle refactoring.
tests/Queue/servers/AMQP/worker.php (5)
6-8: LGTM: Import statements updated correctly.The direct imports for AMQP, Swoole, and Server are clean and follow standard conventions.
10-12: LGTM: Component instantiation follows updated API.The AMQP broker, Swoole adapter, and Server are correctly instantiated with appropriate parameters.
14-14: LGTM: Job handler uses fluent API.The handler correctly uses the inject/action pattern consistent with other test files.
23-25: LGTM: Worker start hook correctly implemented.The inline format and single-quoted string are consistent with coding standards.
27-29: LGTM: Worker stop hook added.This correctly demonstrates the new workerStop lifecycle hook for the AMQP/Swoole configuration.
src/Queue/Server.php (4)
33-63: LGTM: Worker lifecycle hooks refactored to arrays.Converting from single typed properties to arrays with default initialization (
= []) properly resolves the previous concerns about uninitialized properties and enables multiple hook registration.
156-165: LGTM: setResource signature updated to multi-line format.The formatting improves readability while maintaining the same functionality.
174-217: LGTM: Telemetry histogram buckets reformatted.The multi-line format for ExplicitBucketBoundaries improves readability without changing functionality.
377-398: LGTM: Worker stop handling is robust.The implementation correctly:
- Iterates over all registered workerStop hooks
- Isolates hook failures with try/catch to prevent one hook from breaking others
- Guarantees consumer connection closure via finally block
- Provides clear error logging
src/Queue/Adapter/Swoole.php (5)
5-20: LGTM: Properties support manual worker management.The new properties correctly support the architectural shift from pool-based to manual Process management:
$workerstracks spawned Process instances$runningcontrols the supervisor loop$onWorkerStartand$onWorkerStophold lifecycle callbacks
32-53: LGTM: Supervisor loop properly implements worker orchestration.The implementation correctly:
- Spawns workers before entering the supervisor loop
- Enables Swoole coroutine hooks for I/O
- Registers signal handlers in the master process context
- Maintains a supervisor loop that monitors worker state
- Uses SIGCHLD to trigger reaping of finished workers
55-76: LGTM: Worker lifecycle correctly managed.The spawnWorker implementation properly:
- Runs worker callbacks inside a coroutine context for async support
- Registers SIGTERM handler in the worker process to gracefully close the consumer
- Invokes onWorkerStart hooks within the coroutine (line 63-65)
- Invokes onWorkerStop hooks after the coroutine exits (line 69-71), ensuring proper cleanup order
- Tracks the spawned process for supervisor management
78-92: LGTM: Shutdown handling is correct.The reap() method properly cleans up finished workers using
Process::wait(), and stop() correctly signals all workers to terminate gracefully via SIGTERM.Note: The static analysis warning about unused
$processon line 88 is a false positive — the iteration is necessary to obtain$pidkeys for signaling.
94-104: LGTM: Lifecycle callback registration follows consistent pattern.The workerStart and workerStop methods correctly append callbacks to their respective arrays and return
$thisfor fluent chaining, consistent with the adapter interface.
| public function close(): void | ||
| { | ||
| $this->delegateConsumer(__FUNCTION__, \func_get_args()); | ||
| // TODO: Implement closing all connections in the pool | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete close() implementation may cause resource leaks.
The close() method currently does nothing, while other broker implementations (Redis, AMQP) properly clean up resources. If Pool is actively used, connections may not be released on shutdown.
🔎 Suggested implementation
public function close(): void
{
- // TODO: Implement closing all connections in the pool
+ // Close all connections in the publisher pool
+ $this->publisher?->reclaim();
+
+ // Close all connections in the consumer pool
+ $this->consumer?->reclaim();
}Note: The exact implementation depends on the UtopiaPool API. Would you like me to investigate how connections should be closed in a pool context?
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/Queue/Broker/Pool.php around lines 42 to 45, the close() method is
currently empty and must explicitly release all pooled connections to avoid
leaks; implement it to iterate over the underlying UtopiaPool entries (or use
its provided drain/clear/destroy API), call the appropriate connection
close/destroy method or release function for each resource, catch and log any
exceptions to avoid halting shutdown, and then mark the pool as closed/cleared
so subsequent calls are no-ops; ensure the implementation respects the
UtopiaPool lifecycle API (drain/clear/destroy) and updates any internal state
flags.
1db31ba to
f0982fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/Queue/Broker/Redis.php (1)
1-1: Fix PSR-12 formatting issue.The pipeline reports a braces style violation. Run Pint to automatically fix the formatting:
#!/bin/bash vendor/bin/pint src/Queue/Broker/Redis.php
♻️ Duplicate comments (4)
src/Queue/Broker/AMQP.php (1)
134-134: Verify thatAMQPChannel::stopConsume()exists in php-amqplib.A previous review flagged that
stopConsume()may not exist onAMQPChannelin php-amqplib, which would cause a fatal error. If the method doesn't exist, you should either callbasic_cancel()for each active consumer tag (iterating$this->channel->callbacks) or simply close the channel/connection directly without callingstopConsume().Please search the web to confirm whether
AMQPChannel::stopConsume()exists in php-amqplib:php-amqplib AMQPChannel stopConsume method documentationsrc/Queue/Server.php (3)
106-113: Undefined index access on'reset'when resource callback doesn't exist.Line 109 accesses
self::$resourcesCallbacks[$name]['reset']before verifying that$nameexists inresourcesCallbacks(checked on line 111). This will throw an undefined index notice.🔎 Proposed fix
- if ( - !\array_key_exists($name, $this->resources) || - $fresh || - self::$resourcesCallbacks[$name]['reset'] - ) { + if ( + !\array_key_exists($name, $this->resources) + || $fresh + || (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset']) + ) {
304-315: Group init hooks run unconditionally while global hooks are gated bygetHook().Global init hooks (lines 291-302) are gated by
$this->job->getHook(), but group init hooks (lines 304-315) run unconditionally. This inconsistency may lead to unexpected behavior.🔎 Proposed fix for consistency
- foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { - // Group init hooks - if (\in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments( - $hook, - $message->getPayload(), - ); - $hook->getAction()(...$arguments); - } - } - } + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { + // Group init hooks + if (\in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); + } + } + } + }
333-355: Comments incorrectly reference "init hooks" in shutdown callback; group shutdown hooks not gated.Line 333 says "Global init hooks" and line 346 says "Group init hooks", but both are in the success callback which runs shutdown hooks. Additionally, group shutdown hooks (lines 344-355) are not gated by
getHook()while global hooks are.🔎 Proposed fix
if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array('*', $hook->getGroups())) { $arguments = $this->getArguments( $hook, $message->getPayload(), ); $hook->getAction()(...$arguments); } } } - foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { - // Group init hooks - if (\in_array($group, $hook->getGroups())) { + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { + foreach ($this->shutdownHooks as $hook) { + // Group shutdown hooks + if (\in_array($group, $hook->getGroups())) { $arguments = $this->getArguments( $hook, $message->getPayload(), ); $hook->getAction()(...$arguments); + } } } }
🧹 Nitpick comments (2)
src/Queue/Adapter/Swoole.php (1)
81-87: Silence unused variable warning in stop method.The
$processvariable on line 83 is unused. Prefix it with an underscore to indicate it's intentionally unused.🔎 Proposed fix
public function stop(): self { - foreach ($this->workers as $pid => $process) { + foreach ($this->workers as $pid => $_process) { Process::kill($pid, SIGTERM); } return $this; }src/Queue/Server.php (1)
422-450: Minor documentation wording.The PHPDoc comments have minor grammatical issues:
- Line 423: "Worker starts hooks" → "worker start hooks"
- Line 444: "Worker stops hooks" → "worker stop hooks"
🔎 Proposed fix
/** - * Returns Worker starts hooks. + * Returns the worker start hooks. * @return array */ public function getWorkerStart(): array { return $this->workerStartHooks; } ... - /** - * Returns Worker stops hooks. - * @return array - */ + /** + * Returns the worker stop hooks. + * @return array + */ public function getWorkerStop(): array
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
pint.jsonsrc/Queue/Adapter.phpsrc/Queue/Adapter/Swoole.phpsrc/Queue/Adapter/Workerman.phpsrc/Queue/Broker/AMQP.phpsrc/Queue/Broker/Pool.phpsrc/Queue/Broker/Redis.phpsrc/Queue/Connection.phpsrc/Queue/Connection/Redis.phpsrc/Queue/Connection/RedisCluster.phpsrc/Queue/Queue.phpsrc/Queue/Server.phptests/Queue/servers/AMQP/Dockerfiletests/Queue/servers/AMQP/worker.phptests/Queue/servers/Swoole/Dockerfiletests/Queue/servers/Swoole/worker.phptests/Queue/servers/SwooleRedisCluster/Dockerfiletests/Queue/servers/SwooleRedisCluster/worker.phptests/Queue/servers/Workerman/Dockerfiletests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
- src/Queue/Adapter.php
- src/Queue/Adapter/Workerman.php
✅ Files skipped from review due to trivial changes (3)
- src/Queue/Queue.php
- tests/Queue/servers/SwooleRedisCluster/Dockerfile
- tests/Queue/servers/AMQP/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (5)
- pint.json
- tests/Queue/servers/Workerman/Dockerfile
- tests/Queue/servers/Workerman/worker.php
- tests/Queue/servers/Swoole/Dockerfile
- tests/Queue/servers/Swoole/worker.php
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(103-107)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Adapter.php (6)
Adapter(5-43)__construct(12-16)start(22-22)stop(28-28)workerStart(35-35)workerStop(42-42)src/Queue/Server.php (5)
__construct(84-88)start(266-408)stop(236-247)workerStart(414-420)workerStop(435-441)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Broker/Pool.php (4)
src/Queue/Queue.php (1)
Queue(5-15)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(103-107)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Connection/Redis.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(103-107)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
tests/Queue/servers/AMQP/worker.php (2)
src/Queue/Adapter/Swoole.php (3)
Swoole(10-100)workerStart(89-93)workerStop(95-99)src/Queue/Server.php (3)
Server(14-537)workerStart(414-420)workerStop(435-441)
src/Queue/Server.php (10)
src/Queue/Job.php (2)
hook(24-29)getHook(36-39)src/Queue/Broker/AMQP.php (2)
consume(75-130)close(132-136)src/Queue/Broker/Pool.php (2)
consume(33-40)close(42-45)src/Queue/Broker/Redis.php (2)
consume(21-101)close(103-107)src/Queue/Consumer.php (2)
consume(17-22)close(27-27)src/Queue/Message.php (1)
Message(5-81)src/Queue/Adapter/Swoole.php (1)
workerStop(95-99)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Redis.php (3)
src/Queue/Connection.php (3)
rightPopArray(8-8)close(28-28)rightPop(13-13)src/Queue/Connection/Redis.php (3)
rightPopArray(63-72)close(172-176)rightPop(74-83)src/Queue/Connection/RedisCluster.php (3)
rightPopArray(57-66)close(169-173)rightPop(68-77)
🪛 GitHub Actions: Linter
src/Queue/Broker/Redis.php
[error] 1-1: PSR-12: braces style issue detected by Pint lint. Braces formatting violation found in this file.
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
83-83: Avoid unused local variables such as '$process'. (undefined)
(UnusedLocalVariable)
🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php
13-13: Class constants with native types are supported only on PHP 8.3 and later.
(classConstant.nativeTypeNotSupported)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Adapter Tests (AMQP)
- GitHub Check: Adapter Tests (SwooleRedisCluster)
- GitHub Check: Adapter Tests (Workerman)
🔇 Additional comments (12)
src/Queue/Connection.php (1)
28-28: LGTM! Essential interface addition for resource cleanup.The
close()method enables explicit connection teardown across all connection implementations, supporting graceful shutdown patterns throughout the queue system.src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM! Clean implementation of connection cleanup.The implementation correctly uses the null-safe operator and nullifies the internal reference, mirroring the pattern in
Redis.php.src/Queue/Connection/Redis.php (1)
172-176: LGTM! Clean and consistent implementation.The implementation correctly handles connection cleanup using the null-safe operator and reference nullification, consistent with the
RedisClusterimplementation.src/Queue/Broker/Redis.php (4)
27-36: LGTM! Excellent error handling for graceful shutdown.The try/catch block correctly distinguishes between shutdown-related exceptions (which should break the loop) and other Redis errors (which should propagate), enabling clean worker termination.
105-106: LGTM! Proper resource cleanup on shutdown.The close implementation correctly signals shutdown and delegates connection cleanup to the underlying connection.
130-130: Verify the timeout reduction from 5 to 2 seconds.The retry operation now uses
POP_TIMEOUT(2 seconds) instead of the previous hardcoded value of 5 seconds. This reduces the wait time for failed jobs but may impact retry behavior under certain Redis loads.
13-13: No action required. The typed class constant is compatible with the project's minimum PHP version of 8.3, as specified in composer.json.Likely an incorrect or invalid review comment.
src/Queue/Adapter/Swoole.php (2)
31-50: LGTM! Well-structured master process orchestration.The start method correctly:
- Spawns all workers upfront
- Sets up master-level signal handlers for SIGTERM, SIGINT, and SIGCHLD
- Waits for all workers to exit before returning
The architecture properly separates master-level concerns (signal handling, worker lifecycle) from worker-level concerns.
74-79: LGTM! Proper zombie process cleanup.The
reapmethod correctly uses non-blockingProcess::waitto clean up exited workers and remove them from the tracking array.src/Queue/Server.php (3)
51-63: LGTM! Clean transition from single hooks to hook arrays.The introduction of
workerStartHooksandworkerStopHooksarrays with proper initialization and PHPDoc annotations enables multiple lifecycle hooks per event.
377-398: Well-structured workerStop with robust error handling.Good implementation:
- Individual hook exceptions are caught and logged without preventing other hooks from running
finallyblock ensuresconsumer->close()is always called, even if hooks throw- Proper cleanup flow for graceful shutdown
360-373: Proper null-safety in error callback.Good use of null-safe operator
?->for$message?->getPid()to handle cases where the message might be null during error conditions.
| protected function spawnWorker(int $workerId): void | ||
| { | ||
| $this->pool->shutdown(); | ||
| return $this; | ||
| $process = new Process(function () use ($workerId) { | ||
| Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); | ||
|
|
||
| Coroutine\run(function () use ($workerId) { | ||
| Process::signal(SIGTERM, fn () => $this->consumer->close()); | ||
|
|
||
| foreach ($this->onWorkerStart as $callback) { | ||
| $callback((string)$workerId); | ||
| } | ||
|
|
||
| foreach ($this->onWorkerStop as $callback) { | ||
| $callback((string)$workerId); | ||
| } | ||
| }); | ||
| }, false, 0, false); | ||
|
|
||
| $pid = $process->start(); | ||
| $this->workers[$pid] = $process; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Worker exits immediately without processing jobs.
The worker process invokes onWorkerStop callbacks immediately after onWorkerStart (lines 64-66), causing the worker to exit before the consume loop can run. The Server wires the consumer loop via workerStart, but the coroutine exits before that loop ever blocks.
Expected flow:
onWorkerStartcallbacks execute (including Server's consume loop)- Consume loop blocks, processing jobs
- On SIGTERM,
consumer->close()breaks the loop onWorkerStopcallbacks execute- Worker exits
Fix: Remove lines 64-66. The onWorkerStop callbacks should be invoked after the worker's main work completes (e.g., in a defer or after the consume loop returns), not immediately after start.
🔎 Proposed fix
protected function spawnWorker(int $workerId): void
{
$process = new Process(function () use ($workerId) {
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
Coroutine\run(function () use ($workerId) {
Process::signal(SIGTERM, fn () => $this->consumer->close());
+ // Register deferred cleanup
+ defer(function () use ($workerId) {
+ foreach ($this->onWorkerStop as $callback) {
+ $callback((string)$workerId);
+ }
+ });
+
foreach ($this->onWorkerStart as $callback) {
$callback((string)$workerId);
}
-
- foreach ($this->onWorkerStop as $callback) {
- $callback((string)$workerId);
- }
});
}, false, 0, false);
$pid = $process->start();
$this->workers[$pid] = $process;
}🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 52 to 72, the onWorkerStop
callbacks are being invoked immediately after onWorkerStart (lines 64–66),
causing the worker coroutine to exit before the server's consume loop runs;
remove the immediate invocation of onWorkerStop there and instead call those
callbacks only after the worker's main work completes (for example in a deferred
cleanup block or right after the consume loop returns), keeping the SIGTERM
handler that calls $this->consumer->close() and ensuring callbacks are invoked
with the same (string)$workerId.
f0982fc to
5e70837
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (6)
src/Queue/Broker/AMQP.php (1)
134-134:stopConsume()method does not exist in php-amqplib.The project uses php-amqplib version 3.7.0. The
AMQPChannelclass in php-amqplib does not provide astopConsume()method. This call will result in a fatal error at runtime.To stop consumption, you need to cancel active consumer tags using
basic_cancel()for each registered consumer, or simply close the channel directly if consumer tags are not tracked.Does php-amqplib AMQPChannel class have a stopConsume method in version 3.7.0?🔎 Suggested fix
If you don't track consumer tags, simply remove the
stopConsume()call and let the channel close handle cleanup:public function close(): void { - $this->channel?->stopConsume(); $this->channel?->getConnection()?->close(); }Alternatively, if you need to explicitly cancel consumers, track consumer tags and cancel them:
public function close(): void { + // Cancel all active consumers if tags are tracked + foreach ($this->channel?->callbacks ?? [] as $consumerTag => $_) { + $this->channel->basic_cancel($consumerTag); + } $this->channel?->getConnection()?->close(); }tests/Queue/servers/Workerman/worker.php (1)
12-12: Fix queue name typo: 'wokerman' should be 'workerman'.The queue name contains a typo and should be
'workerman'for consistency with the adapter name.🔎 Proposed fix
-$adapter = new Workerman($consumer, 12, 'wokerman'); +$adapter = new Workerman($consumer, 12, 'workerman');src/Queue/Broker/Pool.php (1)
42-44: Implementclose()to prevent resource leaks.The
close()method remains unimplemented with a TODO. Unlike other broker implementations (Redis, AMQP) that properly clean up resources, this may cause connection leaks when the pool is not explicitly drained.Based on the
UtopiaPoolAPI, you likely need to call a drain/reclaim method on both publisher and consumer pools. Would you like me to generate a script to verify theUtopiaPoolAPI for proper cleanup methods?src/Queue/Adapter/Swoole.php (1)
52-72: Critical: Worker coroutine exits immediately, consume loop never runs.Lines 64-66 invoke
onWorkerStopcallbacks immediately afteronWorkerStartcompletes, causing the worker coroutine to exit before the consume loop (registered viaworkerStartin Server.php line 269) can block and process jobs.Expected flow:
onWorkerStartcallbacks execute (Server registers consume loop here)- Consume loop blocks, processing messages
- On SIGTERM,
consumer->close()breaks the looponWorkerStopcallbacks execute- Coroutine exits
Current flow:
onWorkerStartcallbacks executeonWorkerStopcallbacks execute immediately- Coroutine exits (consume loop never blocks)
Fix: Remove lines 64-66. The
onWorkerStopcallbacks should execute after the consume loop naturally returns (when the consumer is closed), not immediately after start. Usedefer()or move the invocation to after the consume loop completes.🔎 Proposed fix using defer
protected function spawnWorker(int $workerId): void { $process = new Process(function () use ($workerId) { Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); Coroutine\run(function () use ($workerId) { Process::signal(SIGTERM, fn () => $this->consumer->close()); + // Ensure stop callbacks run after consume loop exits + defer(function () use ($workerId) { + foreach ($this->onWorkerStop as $callback) { + $callback((string)$workerId); + } + }); + foreach ($this->onWorkerStart as $callback) { $callback((string)$workerId); } - - foreach ($this->onWorkerStop as $callback) { - $callback((string)$workerId); - } }); }, false, 0, false); $pid = $process->start(); $this->workers[$pid] = $process; }src/Queue/Server.php (2)
304-315: Group hooks ignore the hook selector flag.Global hooks respect
$this->job->getHook()(lines 291, 331), but group hooks (lines 304-315 for init, 344-355 for shutdown) run unconditionally when the group matches. This creates inconsistent behavior where settinghook(false)disables global hooks but not group hooks.For consistency, gate group hooks the same way:
🔎 Proposed fix
- foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { - // Group init hooks - if (\in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments( - $hook, - $message->getPayload(), - ); - $hook->getAction()(...$arguments); + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { + // Group init hooks + if (\in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); + } } } }Apply the same pattern to shutdown group hooks (lines 344-355).
Also applies to: 344-355
106-110: Guard against undefined index when accessing callback reset flag.Line 109 accesses
self::$resourcesCallbacks[$name]['reset']before verifying the callback exists (checked at line 111). If a resource exists in$this->resourcesbut has no registered callback (e.g.,'error'initialized at line 69), this will trigger an undefined index error.Short-circuit evaluation doesn't prevent this: When the first two conditions are false, PHP evaluates the third condition before entering the block that checks callback existence.
🔎 Proposed fix
if ( !\array_key_exists($name, $this->resources) || $fresh || - self::$resourcesCallbacks[$name]['reset'] + (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset']) ) { if (!\array_key_exists($name, self::$resourcesCallbacks)) { throw new Exception("Failed to find resource: $name"); }
🧹 Nitpick comments (1)
src/Queue/Adapter/Swoole.php (1)
81-87: Unused$processvariable is a false positive.PHPMD flags
$processas unused on line 83, but it's required for theforeachloop to iterate over worker PIDs. The actualProcessinstance isn't needed, only the$pidkeys. This is a common pattern and can be safely ignored.If you prefer to silence the linter, you can use only the keys:
public function stop(): self { - foreach ($this->workers as $pid => $process) { + foreach (array_keys($this->workers) as $pid) { Process::kill($pid, SIGTERM); } return $this; }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
pint.jsonsrc/Queue/Adapter.phpsrc/Queue/Adapter/Swoole.phpsrc/Queue/Adapter/Workerman.phpsrc/Queue/Broker/AMQP.phpsrc/Queue/Broker/Pool.phpsrc/Queue/Broker/Redis.phpsrc/Queue/Connection.phpsrc/Queue/Connection/Redis.phpsrc/Queue/Connection/RedisCluster.phpsrc/Queue/Queue.phpsrc/Queue/Server.phptests/Queue/servers/AMQP/Dockerfiletests/Queue/servers/AMQP/worker.phptests/Queue/servers/Swoole/Dockerfiletests/Queue/servers/Swoole/worker.phptests/Queue/servers/SwooleRedisCluster/Dockerfiletests/Queue/servers/SwooleRedisCluster/worker.phptests/Queue/servers/Workerman/Dockerfiletests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
- src/Queue/Adapter/Workerman.php
- src/Queue/Adapter.php
🚧 Files skipped from review as they are similar to previous changes (7)
- tests/Queue/servers/Workerman/Dockerfile
- tests/Queue/servers/AMQP/Dockerfile
- tests/Queue/servers/SwooleRedisCluster/Dockerfile
- tests/Queue/servers/SwooleRedisCluster/worker.php
- tests/Queue/servers/Swoole/worker.php
- src/Queue/Connection/Redis.php
- src/Queue/Queue.php
🧰 Additional context used
🧬 Code graph analysis (6)
src/Queue/Broker/Redis.php (3)
src/Queue/Connection.php (2)
rightPopArray(8-8)rightPop(13-13)src/Queue/Connection/Redis.php (2)
rightPopArray(63-72)rightPop(74-83)src/Queue/Connection/RedisCluster.php (2)
rightPopArray(57-66)rightPop(68-77)
src/Queue/Server.php (7)
src/Queue/Job.php (2)
hook(24-29)getHook(36-39)src/Queue/Broker/AMQP.php (2)
consume(75-130)close(132-136)src/Queue/Broker/Pool.php (2)
consume(33-40)close(42-45)src/Queue/Broker/Redis.php (2)
consume(21-101)close(103-106)src/Queue/Consumer.php (2)
consume(17-22)close(27-27)src/Queue/Adapter/Workerman.php (1)
workerStop(42-49)src/Queue/Connection.php (1)
close(28-28)
tests/Queue/servers/Workerman/worker.php (3)
src/Queue/Adapter.php (3)
Adapter(5-43)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Workerman.php (3)
Workerman(9-50)workerStart(34-41)workerStop(42-49)src/Queue/Server.php (3)
Server(14-537)workerStart(414-420)workerStop(435-441)
tests/Queue/servers/AMQP/worker.php (3)
src/Queue/Adapter.php (3)
Adapter(5-43)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Swoole.php (3)
Swoole(10-100)workerStart(89-93)workerStop(95-99)src/Queue/Server.php (3)
Server(14-537)workerStart(414-420)workerStop(435-441)
src/Queue/Connection/RedisCluster.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Connection.php (5)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(103-106)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
🪛 GitHub Actions: Linter
src/Queue/Broker/Redis.php
[error] 1-1: Pint lint failed due to a PSR-12 braces issue. Command 'vendor/bin/pint --test' returned exit code 1.
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
83-83: Avoid unused local variables such as '$process'. (undefined)
(UnusedLocalVariable)
🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php
13-13: Class constants with native types are supported only on PHP 8.3 and later.
(classConstant.nativeTypeNotSupported)
🔇 Additional comments (12)
pint.json (1)
2-5: LGTM! Good addition of single quote enforcement.The Pint configuration correctly adds the
single_quoterule while maintaining the PSR-12 preset. The trailing comma after"psr12"follows best practices for easier future additions. The codebase properly follows the rule, using single quotes for literal strings while preserving double quotes for strings with variable interpolation.tests/Queue/servers/Swoole/Dockerfile (1)
5-5: LGTM! EOF newline added.This formatting change adds a trailing newline, aligning with standard file conventions.
src/Queue/Connection.php (1)
28-28: LGTM! Explicit resource cleanup added to interface.The addition of
close(): voidto the Connection interface enables explicit resource teardown across implementations, supporting the graceful shutdown lifecycle introduced in this PR.src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM! Clean resource teardown implementation.The
close()implementation correctly closes the underlying RedisCluster connection and nullifies the reference, consistent with the pattern used in the Redis connection class.src/Queue/Broker/AMQP.php (1)
114-114: Minor: String literal style changes are cosmetic.The changes from double quotes to single quotes for static strings are stylistic improvements with no functional impact.
Also applies to: 165-165
tests/Queue/servers/AMQP/worker.php (2)
6-8: LGTM! Cleaner imports and initialization.The refactoring to use explicit imports and direct class references improves readability.
Also applies to: 10-12
14-14: LGTM! Properly adopts new lifecycle API.The worker now correctly uses the new
workerStart()andworkerStop()hooks introduced in this PR, along with the updated job action syntax.Also applies to: 23-29
tests/Queue/servers/Workerman/worker.php (1)
7-11: LGTM! Adopts new lifecycle API.The worker correctly uses the new Redis broker wiring and lifecycle hooks (
workerStart()andworkerStop()).Also applies to: 15-15, 24-30
src/Queue/Broker/Pool.php (1)
33-38: LGTM! Formatting improvements enhance readability.The multiline parameter lists and closure formatting improve code readability with no functional changes.
Also applies to: 49-52, 59-62
src/Queue/Broker/Redis.php (2)
13-13: No action required. The project'scomposer.jsonspecifiesphp: >=8.3, which fully supports typed constants. The code is compatible with all supported PHP versions in this project.Likely an incorrect or invalid review comment.
21-40: API signature change verified and correctly propagated across codebase.The addition of
successCallbackanderrorCallbackparameters to theconsume()method has been consistently implemented across all broker implementations (Redis, AMQP, and Pool) and the Consumer interface. The call site in Server.php line 277 correctly passes all four parameters. Exception handling in the Redis broker appropriately breaks on close and rethrows otherwise.src/Queue/Server.php (1)
377-398: Excellent error handling in worker stop sequence.The
try/catch/finallystructure ensures:
- Individual hook failures are logged but don't prevent other hooks from running
- Consumer cleanup (
consumer->close()) always executes, even if hooks throw- Worker shutdown completes gracefully in all scenarios
This defensive pattern prevents hook errors from leaving connections open or preventing graceful shutdown.
5e70837 to
83d6ac3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
src/Queue/Server.php (2)
330-355: Misleading comments in success callback.Lines 333 and 346 have comments referring to "init hooks" but this is the success callback which runs shutdown hooks. These should say "Global shutdown hooks" and "Group shutdown hooks" respectively.
🔎 Proposed fix
if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array('*', $hook->getGroups())) { $arguments = $this->getArguments( $hook, $message->getPayload(), ); $hook->getAction()(...$arguments); } } } foreach ($this->job->getGroups() as $group) { foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) {
106-110: Potential undefined index on'reset'access.The condition accesses
self::$resourcesCallbacks[$name]['reset']without first verifying thatself::$resourcesCallbacks[$name]exists. If a resource is set directly in$this->resourcesbut no callback is registered, this will throw an undefined index notice.🔎 Proposed fix
public function getResource(string $name, bool $fresh = false): mixed { if ( !\array_key_exists($name, $this->resources) || $fresh || - self::$resourcesCallbacks[$name]['reset'] + (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset']) ) {src/Queue/Adapter/Swoole.php (1)
52-72: Critical: Worker exits immediately without processing jobs.The
onWorkerStopcallbacks are invoked immediately afteronWorkerStart(lines 64-66), causing the worker to exit before the consume loop ever runs. TheServerwires the consumer loop viaworkerStart, but the coroutine exits before that loop blocks.Expected flow:
onWorkerStartcallbacks execute (including Server's consume loop)- Consume loop blocks, processing jobs
- On SIGTERM,
consumer->close()breaks the looponWorkerStopcallbacks execute- Worker exits
The
onWorkerStopcallbacks should only run after the worker's main work completes, not immediately after start.🔎 Proposed fix using defer
protected function spawnWorker(int $workerId): void { $process = new Process(function () use ($workerId) { Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); Coroutine\run(function () use ($workerId) { Process::signal(SIGTERM, fn () => $this->consumer->close()); + // Register deferred cleanup - runs when coroutine completes + defer(function () use ($workerId) { + foreach ($this->onWorkerStop as $callback) { + $callback((string)$workerId); + } + }); + foreach ($this->onWorkerStart as $callback) { $callback((string)$workerId); } - - foreach ($this->onWorkerStop as $callback) { - $callback((string)$workerId); - } }); }, false, 0, false); $pid = $process->start(); $this->workers[$pid] = $process; }
🧹 Nitpick comments (1)
src/Queue/Adapter/Swoole.php (1)
81-87: Unused$processvariable in loop.The
$processvariable is assigned but never used. Consider iterating over keys only.🔎 Proposed fix
public function stop(): self { - foreach ($this->workers as $pid => $process) { + foreach (array_keys($this->workers) as $pid) { Process::kill($pid, SIGTERM); } return $this; }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
pint.jsonsrc/Queue/Adapter.phpsrc/Queue/Adapter/Swoole.phpsrc/Queue/Adapter/Workerman.phpsrc/Queue/Broker/AMQP.phpsrc/Queue/Broker/Pool.phpsrc/Queue/Broker/Redis.phpsrc/Queue/Connection.phpsrc/Queue/Connection/Redis.phpsrc/Queue/Connection/RedisCluster.phpsrc/Queue/Queue.phpsrc/Queue/Server.phptests/Queue/servers/AMQP/Dockerfiletests/Queue/servers/AMQP/worker.phptests/Queue/servers/Swoole/Dockerfiletests/Queue/servers/Swoole/worker.phptests/Queue/servers/SwooleRedisCluster/Dockerfiletests/Queue/servers/SwooleRedisCluster/worker.phptests/Queue/servers/Workerman/Dockerfiletests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
- src/Queue/Adapter.php
- src/Queue/Adapter/Workerman.php
✅ Files skipped from review due to trivial changes (2)
- tests/Queue/servers/Swoole/Dockerfile
- tests/Queue/servers/SwooleRedisCluster/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (9)
- src/Queue/Connection.php
- src/Queue/Queue.php
- pint.json
- src/Queue/Connection/RedisCluster.php
- src/Queue/Broker/AMQP.php
- tests/Queue/servers/Workerman/worker.php
- src/Queue/Broker/Pool.php
- src/Queue/Connection/Redis.php
- tests/Queue/servers/AMQP/Dockerfile
🧰 Additional context used
🧬 Code graph analysis (4)
tests/Queue/servers/SwooleRedisCluster/worker.php (4)
src/Queue/Connection/RedisCluster.php (1)
RedisCluster(7-184)src/Queue/Adapter.php (3)
Adapter(5-43)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Swoole.php (3)
Swoole(10-100)workerStart(89-93)workerStop(95-99)src/Queue/Server.php (3)
Server(14-537)workerStart(414-420)workerStop(435-441)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Adapter.php (5)
Adapter(5-43)start(22-22)stop(28-28)workerStart(35-35)workerStop(42-42)src/Queue/Server.php (4)
start(266-408)stop(236-247)workerStart(414-420)workerStop(435-441)src/Queue/Consumer.php (1)
close(27-27)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Broker/AMQP.php (1)
AMQP(20-226)src/Queue/Adapter.php (3)
Adapter(5-43)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Swoole.php (3)
Swoole(10-100)workerStart(89-93)workerStop(95-99)src/Queue/Server.php (3)
Server(14-537)workerStart(414-420)workerStop(435-441)
src/Queue/Server.php (3)
src/Queue/Consumer.php (2)
consume(17-22)close(27-27)src/Queue/Adapter.php (1)
workerStop(42-42)src/Queue/Connection.php (1)
close(28-28)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
83-83: Avoid unused local variables such as '$process'. (undefined)
(UnusedLocalVariable)
🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php
13-13: Class constants with native types are supported only on PHP 8.3 and later.
(classConstant.nativeTypeNotSupported)
🔇 Additional comments (14)
tests/Queue/servers/Workerman/Dockerfile (1)
11-11: LGTM! EOF normalization.The trailing newline addition standardizes the file format with no functional impact.
src/Queue/Broker/Redis.php (3)
27-35: LGTM!The exception handling correctly distinguishes between shutdown-triggered exceptions (which should break the loop gracefully) and genuine Redis errors (which should propagate). This enables clean graceful shutdown behavior.
128-128: LGTM!Consistent use of
POP_TIMEOUTin retry aligns with the consume method and improves maintainability.
13-14: Theprivate const int POP_TIMEOUT = 2;syntax is appropriate for this codebase. The utopia-php/queue library explicitly requires PHP 8.3+ in composer.json, making typed class constants fully supported and compatible. No changes needed.tests/Queue/servers/SwooleRedisCluster/worker.php (2)
6-19: LGTM!The inline construction pattern with RedisCluster connection, Swoole adapter, and Server is clean and follows the new public API design. The 12-worker configuration matches other test files.
21-36: LGTM!The job wiring and lifecycle hooks correctly use the new fluent API pattern. The
workerStart()andworkerStop()hooks demonstrate the intended usage of the new lifecycle management.tests/Queue/servers/Swoole/worker.php (1)
6-32: LGTM!The test file correctly demonstrates the new API with single-node Redis connection. The structure is consistent with other test worker files and properly exercises the
workerStart()andworkerStop()lifecycle hooks.tests/Queue/servers/AMQP/worker.php (1)
6-31: LGTM!The AMQP test file follows the same pattern as other test workers. The named parameters for the AMQP constructor improve readability. The lifecycle hooks are wired correctly.
src/Queue/Adapter/Swoole.php (2)
31-50: LGTM!The supervisor loop correctly manages worker lifecycle with signal handlers for graceful shutdown (SIGTERM/SIGINT) and worker reaping (SIGCHLD). The polling approach with
Coroutine::sleep(1)is appropriate for a supervisor process.
89-99: LGTM!The callback registration methods correctly append to their respective arrays and return
$thisfor method chaining, matching the abstractAdapterinterface contract.src/Queue/Server.php (4)
51-63: LGTM!The multi-hook arrays for
workerStartHooksandworkerStopHooksare properly declared and documented, enabling multiple lifecycle callbacks to be registered.
377-398: LGTM!Excellent defensive coding. Each hook is wrapped in its own try/catch, preventing one failing hook from blocking others. The
finallyblock ensures the consumer connection is always closed regardless of hook failures. This is the correct pattern for cleanup operations.
414-450: LGTM!The multi-hook registration pattern is consistent across
workerStart()andworkerStop(). Both methods create a newHook, add it to their respective arrays, and return the hook for fluent configuration. The getter methods correctly return the arrays for inspection.Note: This is a breaking change from the previous single-hook API, but it aligns with the PR's objective to support multiple lifecycle hooks.
360-373: LGTM!The error callback correctly handles the nullable
Messageparameter with null-safe operators ($message?->getPid()). The error resource is properly set and error hooks are invoked with their arguments.
Summary by CodeRabbit
New Features
Bug Fixes
Chores / Refactor
Breaking Changes
✏️ Tip: You can customize this high-level summary in your review settings.