Skip to content

Conversation

@loks0n
Copy link
Contributor

@loks0n loks0n commented Sep 5, 2025

Summary by CodeRabbit

  • New Features

    • Per-worker lifecycle hooks added; workers now emit observable "started" and "stopped" events and support explicit spawn/reap supervision.
    • Server API exposes worker start/stop registration and improved lifecycle chaining.
  • Bug Fixes

    • AMQP stops consumption before shutdown to avoid hanging consumers.
    • Redis consume loop more resilient to broker errors and uses a configurable pop timeout.
  • Chores / Refactor

    • Standardized test/service container mounts and formatting/tooling updates.
  • Breaking Changes

    • Native-adapter accessor removed; adapter API and worker lifecycle signatures changed.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

This comment was marked as spam.

coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

@loks0n loks0n requested a review from abnegate September 10, 2025 14:47
@loks0n loks0n changed the title feat: add workerStop handling feat: add workerStop handler, graceful exits Sep 10, 2025
coderabbitai[bot]

This comment was marked as outdated.

@loks0n loks0n requested review from Meldiron and Copilot October 7, 2025 12:25
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Graceful shutdown and lifecycle hooks for queue workers across adapters, plus test container updates for signal support.

  • Add per-worker stop hooks and worker stop event handling.
  • Introduce signal-based graceful shutdown for Swoole and ensure AMQP consumers stop before closing.
  • Standardize test container volumes and enable pcntl in images.

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/Queue/servers/Workerman/worker.php Adds workerStop hook usage for test worker lifecycle.
tests/Queue/servers/SwooleRedisCluster/worker.php Adds workerStop hook usage for cluster worker lifecycle.
tests/Queue/servers/Swoole/worker.php Adds workerStop hook usage for Swoole worker lifecycle.
tests/Queue/servers/AMQP/worker.php Adds workerStop hook usage for AMQP worker lifecycle.
tests/Queue/servers/SwooleRedisCluster/Dockerfile Enables pcntl for proper signal handling in tests.
tests/Queue/servers/Swoole/Dockerfile Enables pcntl for proper signal handling in tests.
tests/Queue/servers/AMQP/Dockerfile Enables pcntl for proper signal handling in tests.
src/Queue/Server.php Adds workerStop Hook API and integrates stop handling into worker lifecycle.
src/Queue/Broker/AMQP.php Attempts to stop consumption before closing channel/connection.
src/Queue/Adapter/Swoole.php Adds signal handlers and proper EVENT_* usage; introduces main-process stop path.
docker-compose.yml Standardizes container code paths and volumes for tests.

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 15 to 17
/** @var callable */
private $onStop;
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property $onStop is declared without a nullable type or default value. This should be declared as private ?callable $onStop = null; to be explicit about its nullable nature and avoid potential issues.

Suggested change
/** @var callable */
private $onStop;
private ?callable $onStop = null;

Copilot uses AI. Check for mistakes.
@loks0n loks0n force-pushed the feat-workerstop branch 2 times, most recently from 6efa651 to 8dba579 Compare December 16, 2025 11:58
@loks0n loks0n requested a review from Copilot December 16, 2025 11:58
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
src/Queue/Adapter/Swoole.php (4)

93-104: Mirror the parameter type fixes in workerStop().

Apply the same parameter corrections as in workerStart(): rename $pool to $_pool and type $workerId as int.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
             \call_user_func($callback, $workerId);
         });

54-64: Remove the direct onStop invocation from stop().

Calling $this->onStop() without a $workerId parameter will cause an ArgumentCountError, as the callback registered via workerStop() expects a $workerId argument (see line 100 and Server.php line 377). The EVENT_WORKER_STOP handler already invokes the callback correctly for each worker.

Apply this diff:

 public function stop(): self
 {
-    if ($this->onStop) {
-        \call_user_func($this->onStop);
-    }
-
     Console::info("[Swoole] Shutting down process pool...");
     $this->pool->shutdown();
     Console::success("[Swoole] Process pool stopped.");
     return $this;
 }

66-91: Fix parameter types and silence PHPMD warning.

Two issues with the event handler signature:

  1. The $pool parameter is unused and should be renamed to $_pool to silence PHPMD.
  2. Swoole passes $workerId as int, not string.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_START, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
             // Register signal handlers in worker
             Process::signal(SIGTERM, function () {

16-17: Type the $onStop property explicitly.

The property should use a typed declaration instead of a docblock to prevent accidental non-callable assignments and clarify intent.

Apply this diff:

-    /** @var callable */
-    private $onStop;
+    private ?callable $onStop = null;
src/Queue/Server.php (1)

106-110: Guard against undefined index when accessing resource callbacks.

Line 109 accesses self::$resourcesCallbacks[$name]["reset"] before line 111 checks if $name exists in the array. This can trigger an undefined index warning.

Apply this diff to reorder the checks:

         if (
             !\array_key_exists($name, $this->resources) ||
             $fresh ||
-            self::$resourcesCallbacks[$name]["reset"]
+            (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]["reset"])
         ) {
             if (!\array_key_exists($name, self::$resourcesCallbacks)) {
🧹 Nitpick comments (1)
src/Queue/Server.php (1)

428-443: LGTM!

The workerStop() and getWorkerStop() methods mirror the workerStart() pattern correctly, enabling registration and retrieval of multiple stop hooks.

Optional: The docblock on line 437 could be refined to "Returns worker stop hooks." for better grammar, but this is a minor nitpick.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e078822 and 8dba579.

📒 Files selected for processing (12)
  • docker-compose.yml (5 hunks)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/AMQP.php (1 hunks)
  • src/Queue/Broker/Pool.php (2 hunks)
  • src/Queue/Server.php (11 hunks)
  • tests/Queue/servers/AMQP/Dockerfile (1 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/Dockerfile (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (5)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/Swoole/Dockerfile
  • src/Queue/Broker/AMQP.php
  • tests/Queue/servers/Workerman/worker.php
  • tests/Queue/servers/AMQP/worker.php
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Adapter.php (6)
  • Adapter (5-49)
  • __construct (12-16)
  • stop (28-28)
  • start (22-22)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Server.php (5)
  • __construct (84-88)
  • stop (236-247)
  • start (266-401)
  • workerStart (407-413)
  • workerStop (428-434)
tests/Queue/servers/Swoole/worker.php (5)
src/Queue/Server.php (4)
  • Server (14-530)
  • job (90-94)
  • workerStart (407-413)
  • workerStop (428-434)
src/Queue/Adapter.php (3)
  • Adapter (5-49)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (12-110)
  • workerStart (66-91)
  • workerStop (93-104)
src/Queue/Connection/Redis.php (1)
  • Redis (7-184)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

69-69: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


97-97: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (14)
tests/Queue/servers/AMQP/Dockerfile (1)

5-5: LGTM!

Installing the PCNTL extension is necessary for the signal-handling features introduced in this PR.

docker-compose.yml (1)

6-8: LGTM!

Standardizing volume mount paths to /usr/src/code across all services improves consistency and maintainability.

Also applies to: 20-22, 31-33, 43-45, 55-57

tests/Queue/servers/Swoole/worker.php (3)

3-9: LGTM!

The updated require paths and explicit imports improve code clarity and align with the refactored queue infrastructure.


11-26: LGTM!

The refactored server setup is clean and follows the new chained API pattern. The use of handleRequest(...) with the spread operator is appropriate.


28-30: LGTM!

The new workerStop() hook correctly registers a shutdown action, mirroring the workerStart() pattern. This enables graceful cleanup when workers terminate.

tests/Queue/servers/SwooleRedisCluster/worker.php (2)

3-21: LGTM!

The refactored setup correctly instantiates a Redis cluster-backed consumer and follows the same clean API pattern as the other test workers.


30-36: LGTM!

Both workerStart() and workerStop() hooks are correctly implemented and consistent with the other test worker implementations.

src/Queue/Adapter/Swoole.php (1)

31-52: LGTM!

The signal handling using Process::signal for graceful shutdown is appropriate for Swoole. The implementation correctly registers handlers for SIGTERM and SIGINT in the master process.

src/Queue/Server.php (6)

33-63: LGTM!

The transition from single hooks to arrays (workerStartHooks, workerStopHooks) enables registration of multiple lifecycle hooks, improving extensibility.


156-166: LGTM!

The formatting improvements and explicit string keys for the resource callback array enhance code clarity.


269-275: LGTM!

The workerStart callback correctly iterates over all registered hooks and invokes them with the proper arguments. The typed $workerId parameter is appropriate.


277-374: LGTM!

The consumer callback correctly handles the job lifecycle with init hooks, job execution, shutdown hooks, and error handling. The telemetry recording for wait time and process duration is appropriate.


377-391: LGTM!

The workerStop callback correctly iterates over registered stop hooks and closes the consumer connection gracefully. This is a key addition for enabling clean worker shutdown.


407-422: LGTM!

The workerStart() method correctly creates and registers hooks in the array, and getWorkerStart() provides access to the registered hooks. The pattern enables multiple start hooks.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

{
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
$this->onStop = $callback;
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workerStop callback is being stored in both the $onStop property and registered with the pool's EVENT_WORKER_STOP event. However, in the stop() method, $onStop is called which will invoke the callback with no arguments, while the EVENT_WORKER_STOP expects the callback to receive $workerId. This creates inconsistent invocation - the callback will be called once without arguments in stop(), and once with $workerId from the pool event. Consider either removing line 95 or restructuring to avoid double invocation with different signatures.

Copilot uses AI. Check for mistakes.
Comment on lines 72 to 86
// Register signal handlers in worker
Process::signal(SIGTERM, function () {
Console::info(
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
);
$this->stop();
});

Process::signal(SIGINT, function () {
Console::info(
"[Swoole] Received SIGINT, initiating graceful shutdown...",
);
$this->stop();
});

Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signal handlers registered within the workerStart callback will override the signal handlers registered in the start() method (lines 36-48). In Swoole, signal handlers registered in worker processes override those in the master process. This means the master process signals won't work as expected. Consider removing the duplicate signal registration in workerStart since the master process should handle shutdown coordination.

Suggested change
// Register signal handlers in worker
Process::signal(SIGTERM, function () {
Console::info(
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
);
$this->stop();
});
Process::signal(SIGINT, function () {
Console::info(
"[Swoole] Received SIGINT, initiating graceful shutdown...",
);
$this->stop();
});
// Worker initialization logic

Copilot uses AI. Check for mistakes.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Typo in queue name: "wokerman" should be "workerman".

Apply this diff:

-$adapter = new Workerman($consumer, 12, "wokerman");
+$adapter = new Workerman($consumer, 12, "workerman");
src/Queue/Adapter/Swoole.php (2)

61-64: Fix parameter types: rename unused $pool and type $workerId as int.

The Pool parameter is unused (triggers PHPMD warning) and should be renamed to $_pool. Additionally, Swoole's Pool event callbacks pass $workerId as an int, not a string.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_START, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {

88-91: Fix parameter types: rename unused $pool and type $workerId as int.

Same issue as in workerStart: the $pool parameter is unused and should be prefixed with an underscore, and $workerId should be typed as int to match Swoole's signature.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
🧹 Nitpick comments (2)
src/Queue/Server.php (2)

333-333: Fix misleading comment.

The comment says "Global init hooks" but this code is in the shutdown (success) callback, so it should say "Global shutdown hooks".

Apply this diff:

-                                // Global init hooks
+                                // Global shutdown hooks

346-346: Fix misleading comment.

The comment says "Group init hooks" but this code is in the shutdown (success) callback, so it should say "Group shutdown hooks".

Apply this diff:

-                                // Group init hooks
+                                // Group shutdown hooks
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8dba579 and d62ecb0.

📒 Files selected for processing (7)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/Pool.php (2 hunks)
  • src/Queue/Server.php (11 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (3)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Broker/AMQP.php (1)
  • AMQP (20-226)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (12-102)
  • workerStart (59-84)
  • workerStop (86-96)
src/Queue/Server.php (4)
  • Server (14-538)
  • job (90-94)
  • workerStart (415-421)
  • workerStop (436-442)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
src/Queue/Server.php (4)
src/Queue/Job.php (1)
  • hook (24-29)
src/Queue/Consumer.php (1)
  • consume (17-22)
src/Queue/Message.php (4)
  • Message (5-81)
  • getPid (52-55)
  • getTimestamp (62-65)
  • getPayload (67-70)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
tests/Queue/servers/Workerman/worker.php (5)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-55)
  • workerStart (34-41)
  • workerStop (42-49)
src/Queue/Broker/Redis.php (1)
  • Redis (11-167)
src/Queue/Connection/Redis.php (1)
  • Redis (7-184)
src/Queue/Server.php (5)
  • Server (14-538)
  • job (90-94)
  • error (531-537)
  • workerStart (415-421)
  • workerStop (436-442)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

62-62: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


89-89: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Adapter Tests (Workerman)
  • GitHub Check: Adapter Tests (SwooleRedisCluster)

@loks0n loks0n force-pushed the feat-workerstop branch 2 times, most recently from 92916aa to 957765c Compare December 16, 2025 13:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Typo in queue name: "wokerman" should be "workerman".

src/Queue/Server.php (1)

332-346: Fix misleading comments in shutdown callback.

The comments say "init hooks" but this code is in the shutdown callback context.

                     foreach ($this->shutdownHooks as $hook) {
-                                // Global init hooks
+                                // Global shutdown hooks
                         if (\in_array("*", $hook->getGroups())) {
                             foreach ($this->shutdownHooks as $hook) {
-                                // Group init hooks
+                                // Group shutdown hooks
                                 if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (4)
src/Queue/Broker/Pool.php (1)

42-45: Incomplete close() implementation may lead to resource leaks.

The Pool's close() method is a no-op with a TODO. When Pool::close() is called (e.g., during graceful shutdown), the underlying connections in the pool won't be closed, potentially causing resource leaks.

Do you want me to open an issue to track implementing proper connection pool cleanup?

src/Queue/Adapter/Swoole.php (2)

50-53: Rename unused $pool parameter to suppress PHPMD warning.

The $pool parameter is required by Swoole's callback signature but isn't used. Prefix it with underscore to signal intentional non-use.

     $this->pool->on(Constant::EVENT_WORKER_START, function (
-        Pool $pool,
+        Pool $_pool,
         string $workerId,
     ) use ($callback) {

81-84: Same fix: rename unused $pool to $_pool.

     $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-        Pool $pool,
+        Pool $_pool,
         string $workerId,
     ) use ($callback) {
src/Queue/Server.php (1)

423-451: Consider adding specific return type hints in docblocks.

The methods return arrays of Hook objects. The docblocks could be more specific for IDE support:

 /**
  * Returns Worker starts hooks.
- * @return array
+ * @return array<Hook>
  */
 public function getWorkerStart(): array
 /**
  * Returns Worker stops hooks.
- * @return array
+ * @return array<Hook>
  */
 public function getWorkerStop(): array
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d62ecb0 and 957765c.

📒 Files selected for processing (11)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/Pool.php (1 hunks)
  • src/Queue/Broker/Redis.php (1 hunks)
  • src/Queue/Connection.php (1 hunks)
  • src/Queue/Connection/Redis.php (1 hunks)
  • src/Queue/Connection/RedisCluster.php (1 hunks)
  • src/Queue/Server.php (11 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection/Redis.php (4)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Connection.php (4)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Pool.php (5)
src/Queue/Broker/Redis.php (2)
  • consume (19-90)
  • close (92-96)
src/Queue/Queue.php (1)
  • Queue (5-15)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
tests/Queue/servers/Swoole/worker.php (3)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (15-95)
  • workerStart (48-77)
  • workerStop (79-89)
src/Queue/Broker/Redis.php (1)
  • Redis (11-168)
src/Queue/Connection/Redis.php (1)
  • Redis (7-190)
src/Queue/Adapter/Swoole.php (1)
src/Queue/Server.php (5)
  • start (266-409)
  • stop (236-247)
  • shutdown (224-230)
  • workerStart (415-421)
  • workerStop (436-442)
tests/Queue/servers/Workerman/worker.php (3)
src/Queue/Adapter.php (3)
  • Adapter (5-49)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-55)
  • workerStart (34-41)
  • workerStop (42-49)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

51-51: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


82-82: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (16)
tests/Queue/servers/Workerman/worker.php (1)

24-30: LGTM!

The workerStart and workerStop hooks are correctly implemented and align with the new Server API for lifecycle management.

src/Queue/Broker/Redis.php (1)

92-96: LGTM!

The close() method correctly sets the closed flag first to stop the consume loop, then closes the underlying connection. This ensures graceful shutdown without resource leaks.

src/Queue/Connection.php (1)

28-28: LGTM!

Adding close() to the Connection interface establishes a consistent cleanup contract. Both Redis and RedisCluster implementations correctly implement this method.

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM!

The implementation correctly uses the null-safe operator and nullifies the reference after closing, which is consistent with the Redis connection implementation and allows proper garbage collection.

src/Queue/Connection/Redis.php (1)

172-176: LGTM!

The close() implementation follows the same pattern as RedisCluster, providing consistent behavior across connection types.

src/Queue/Broker/Pool.php (1)

33-38: LGTM!

The multi-line formatting for consume() parameters and the delegation closures improves readability without changing behavior.

Also applies to: 47-54, 57-64

tests/Queue/servers/SwooleRedisCluster/worker.php (2)

11-19: LGTM!

The Redis broker construction with RedisCluster connection and Swoole adapter setup is correct and follows the pattern established in other test workers.


30-36: LGTM!

The workerStart and workerStop hooks are correctly wired up, consistent with the new Server lifecycle API.

tests/Queue/servers/Swoole/worker.php (2)

11-13: LGTM!

The Redis broker and Swoole adapter construction follows the correct pattern.


24-30: LGTM!

The workerStart and workerStop lifecycle hooks are correctly implemented, matching the pattern in other test workers.

tests/Queue/servers/AMQP/worker.php (1)

1-31: LGTM!

The test worker script cleanly demonstrates the new lifecycle hooks. The use of first-class callable syntax for the job handler and the addition of workerStart and workerStop hooks are well-structured.

src/Queue/Adapter/Swoole.php (2)

54-74: Signal handling and coroutine approach look solid.

Using Process::signal for Swoole-native signal handling and wrapping the callback in go() allows the event loop to process signals while the consume loop runs. Closing the consumer on signal receipt enables graceful shutdown.


40-46: LGTM!

Good observability by logging shutdown progress.

src/Queue/Server.php (3)

51-63: Clean migration to array-based hook management.

The shift from single hook properties to arrays (workerStartHooks, workerStopHooks) enables multiple lifecycle hooks per event, which is more flexible and aligns with how other hooks (errorHooks, initHooks, shutdownHooks) are already managed.


377-399: Robust workerStop implementation with proper error isolation.

Wrapping each hook in its own try-catch ensures one failing hook doesn't prevent others from running. The finally block guarantees the consumer connection is closed regardless of hook execution outcomes—exactly the right pattern for graceful shutdown.


266-275: LGTM!

The workerStart flow correctly iterates over the hooks array and invokes each hook with proper argument resolution.

Comment on lines 10 to 29
$consumer = new AMQP(host: "amqp", port: 5672, user: "amqp", password: "amqp");
$adapter = new Swoole($consumer, 12, "amqp");
$server = new Server($adapter);

$server->job()
->inject('message')
->action(function (Message $message) {
handleRequest($message);
});
$server->job()->inject("message")->action(handleRequest(...));

$server
->error()
->inject('error')
->inject("error")
->action(function ($th) {
echo $th->getMessage() . PHP_EOL;
});

$server
->workerStart()
->action(function () {
echo "Worker Started" . PHP_EOL;
});
$server->workerStart()->action(function () {
echo "Worker Started" . PHP_EOL;
});

$server->workerStop()->action(function () {
echo "Worker Stopped" . PHP_EOL;
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single quotes

RUN apk add autoconf build-base
RUN apk add autoconf build-base

RUN docker-php-ext-install pcntl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it? I can't see it being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required in older implementation, removing

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Fix the typo in the queue name.

The queue name is 'wokerman' but should be 'workerman'. This was previously flagged in a past review but remains unfixed.

Apply this diff:

-$adapter = new Workerman($consumer, 12, 'wokerman');
+$adapter = new Workerman($consumer, 12, 'workerman');
src/Queue/Adapter/Swoole.php (1)

50-53: Fix $workerId type and silence PHPMD unused $pool warning.

Swoole's Pool events pass int $workerId, not string. The $pool parameter is unused and should be prefixed with _ to indicate intentional non-use.

-        $this->pool->on(Constant::EVENT_WORKER_START, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {

Also applies to: 81-84

src/Queue/Server.php (2)

331-341: Fix misleading comment: "Global init hooks" → "Global shutdown hooks".

This block is inside the shutdown callback, not init.

                 if ($this->job->getHook()) {
                     foreach ($this->shutdownHooks as $hook) {
-                        // Global init hooks
+                        // Global shutdown hooks
                         if (\in_array('*', $hook->getGroups())) {

344-355: Fix misleading comment: "Group init hooks" → "Group shutdown hooks".

Same issue — this is in the shutdown callback.

                 foreach ($this->job->getGroups() as $group) {
                     foreach ($this->shutdownHooks as $hook) {
-                        // Group init hooks
+                        // Group shutdown hooks
                         if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (2)
src/Queue/Broker/Pool.php (1)

42-45: Implement pool connection cleanup for graceful shutdown.

The TODO indicates that closing pooled connections is not yet implemented. Given this PR's focus on graceful exits and worker-stop handlers, this gap means pooled publisher and consumer connections won't be properly released during shutdown, potentially causing resource leaks.

Would you like me to implement the pool cleanup logic? It should iterate through active connections in both publisher and consumer pools and call their close() methods.

src/Queue/Server.php (1)

427-430: Consider adding array<Hook> return type hints for getters.

The docblocks specify @return array, but the typed return could be more specific.

     /**
      * Returns Worker starts hooks.
-     * @return array
+     * @return array<Hook>
      */
     public function getWorkerStart(): array

     /**
      * Returns Worker stops hooks.
-     * @return array
+     * @return array<Hook>
      */
     public function getWorkerStop(): array

Also applies to: 448-451

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 957765c and 1939710.

📒 Files selected for processing (18)
  • pint.json (1 hunks)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/AMQP.php (3 hunks)
  • src/Queue/Broker/Pool.php (1 hunks)
  • src/Queue/Broker/Redis.php (1 hunks)
  • src/Queue/Connection.php (1 hunks)
  • src/Queue/Connection/Redis.php (1 hunks)
  • src/Queue/Connection/RedisCluster.php (1 hunks)
  • src/Queue/Queue.php (1 hunks)
  • src/Queue/Server.php (10 hunks)
  • tests/Queue/servers/AMQP/Dockerfile (1 hunks)
  • tests/Queue/servers/AMQP/worker.php (2 hunks)
  • tests/Queue/servers/Swoole/Dockerfile (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (2 hunks)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (2 hunks)
  • tests/Queue/servers/Workerman/Dockerfile (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • tests/Queue/servers/Workerman/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/Queue/Connection/Redis.php
  • tests/Queue/servers/Swoole/worker.php
  • src/Queue/Connection.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (5)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Server.php (5)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Server.php (4)
  • start (266-409)
  • stop (236-247)
  • shutdown (224-230)
  • workerStart (415-421)
src/Queue/Consumer.php (1)
  • close (27-27)
tests/Queue/servers/Workerman/worker.php (2)
src/Queue/Adapter.php (3)
  • Adapter (5-49)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-55)
  • workerStart (34-41)
  • workerStop (42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

51-51: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


82-82: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (19)
src/Queue/Queue.php (1)

12-12: LGTM!

The string quote change aligns with the new single_quote rule in pint.json.

tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)

5-5: LGTM!

Formatting-only change (trailing newline).

pint.json (1)

2-5: LGTM!

Adding the single_quote rule enforces consistent string quoting across the codebase.

src/Queue/Broker/Redis.php (1)

92-96: LGTM!

Properly closes the underlying connection to ensure resource cleanup, consistent with the Connection interface's close() method.

tests/Queue/servers/AMQP/Dockerfile (1)

3-3: LGTM!

Formatting-only change (trailing newline).

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM!

Properly implements the Connection interface's close() method, ensuring the underlying RedisCluster connection is closed and resources are released.

src/Queue/Broker/AMQP.php (2)

114-114: LGTM!

String quote changes align with the new single_quote rule.

Also applies to: 165-165


132-136: The code is safe—stopConsume() exists in php-amqplib 3.7.0.

AMQPChannel::stopConsume() is a documented public method designed to stop the consume() loop, and has been available since version 3.2.0. Using this method in the close() function will not result in a fatal error; it is the appropriate way to safely interrupt consumer loops before closing the connection.

tests/Queue/servers/Workerman/worker.php (1)

15-15: LGTM!

The refactored API usage with inject('message')->action(handleRequest(...)) and the new workerStart() and workerStop() lifecycle hooks align with the updated Server API.

Also applies to: 24-30

src/Queue/Adapter/Swoole.php (4)

5-13: LGTM on new imports.

The additions of Swoole\Constant, Swoole\Process, Utopia\Console, and the go coroutine function are appropriate for the signal handling and coroutine-based execution introduced in this file.


40-46: LGTM on graceful shutdown logging.

Adding console output before and after shutdown() improves observability during pool termination.


54-67: LGTM on per-worker signal handling.

Registering SIGTERM/SIGINT handlers that call $this->consumer->close() is the correct approach for graceful worker shutdown. Workers close their own consumer and let the pool manage process lifecycle, addressing earlier feedback about not calling stop() from workers.


69-73: LGTM on coroutine wrapping.

Wrapping the callback in go() allows the Swoole event loop to process signals while the consume loop runs.

src/Queue/Server.php (3)

51-63: LGTM on array-based hook properties.

Converting workerStartHook and workerStopHook from single hooks to arrays (workerStartHooks, workerStopHooks) enables multiple hook registration, which is a sensible enhancement.


377-399: LGTM on workerStop lifecycle handling.

The try/finally structure ensures the consumer connection is always closed even if a hook throws. Per-hook exception handling with logging is a good resilience pattern.


415-421: LGTM on hook registration methods.

workerStart() and workerStop() now return Hook instances that are appended to their respective arrays, aligning with the pattern used by init(), shutdown(), and error().

Also applies to: 436-442

tests/Queue/servers/AMQP/worker.php (3)

6-12: LGTM on streamlined imports and instantiation.

Direct class imports and simplified instantiation improve readability.


14-14: LGTM on first-class callable syntax.

Using handleRequest(...) is cleaner than wrapping in a closure.


23-29: LGTM on workerStart/workerStop hooks.

This demonstrates the new array-based hook API with the fluent ->action() pattern.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Typo in queue name: 'wokerman' should be 'workerman'.

This was previously flagged and remains unaddressed.

src/Queue/Server.php (3)

106-120: Guard resource callback access before reading 'reset' key.

Line 109 accesses self::$resourcesCallbacks[$name]['reset'] before verifying the key exists on line 111. This can trigger an undefined index error. The condition should check for key existence before accessing the ['reset'] value.

🔎 Proposed fix
     public function getResource(string $name, bool $fresh = false): mixed
     {
         if (
             !\array_key_exists($name, $this->resources) ||
             $fresh ||
-            self::$resourcesCallbacks[$name]['reset']
+            (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
         ) {
             if (!\array_key_exists($name, self::$resourcesCallbacks)) {
                 throw new Exception("Failed to find resource: $name");
             }

331-342: Fix misleading comment: should be "shutdown hooks".

Line 333 says "Global init hooks" but this code is inside the success callback, which runs shutdown hooks. The comment should read "Global shutdown hooks" for accuracy.

🔎 Proposed fix
                     function (Message $message) {
                         if ($this->job->getHook()) {
                             foreach ($this->shutdownHooks as $hook) {
-                                // Global init hooks
+                                // Global shutdown hooks
                                 if (\in_array('*', $hook->getGroups())) {

344-355: Fix misleading comment: should be "shutdown hooks".

Line 346 says "Group init hooks" but this code is inside the success callback, which runs shutdown hooks. The comment should read "Group shutdown hooks" for accuracy.

🔎 Proposed fix
 
                         foreach ($this->job->getGroups() as $group) {
                             foreach ($this->shutdownHooks as $hook) {
-                                // Group init hooks
+                                // Group shutdown hooks
                                 if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (1)
src/Queue/Server.php (1)

414-450: LGTM: Worker lifecycle API properly implemented.

The workerStart/workerStop methods correctly return Hook instances for fluent registration, and the getter methods return the full arrays. The API is consistent and well-designed.

Note: Line 444 says "Returns Worker stops hooks" — consider "Returns worker stop hooks" for slightly better grammar, though this is purely cosmetic.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1939710 and 1db31ba.

📒 Files selected for processing (20)
  • pint.json
  • src/Queue/Adapter.php
  • src/Queue/Adapter/Swoole.php
  • src/Queue/Adapter/Workerman.php
  • src/Queue/Broker/AMQP.php
  • src/Queue/Broker/Pool.php
  • src/Queue/Broker/Redis.php
  • src/Queue/Connection.php
  • src/Queue/Connection/Redis.php
  • src/Queue/Connection/RedisCluster.php
  • src/Queue/Queue.php
  • src/Queue/Server.php
  • tests/Queue/servers/AMQP/Dockerfile
  • tests/Queue/servers/AMQP/worker.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/Swoole/worker.php
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
  • tests/Queue/servers/Workerman/Dockerfile
  • tests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
  • src/Queue/Adapter/Workerman.php
  • src/Queue/Adapter.php
✅ Files skipped from review due to trivial changes (2)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/Swoole/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/Queue/Broker/AMQP.php
  • tests/Queue/servers/SwooleRedisCluster/worker.php
  • pint.json
  • tests/Queue/servers/Swoole/worker.php
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection/Redis.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (103-107)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Connection.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (103-107)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Pool.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Queue.php (1)
  • Queue (5-15)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Redis.php (3)
src/Queue/Connection.php (3)
  • rightPopArray (8-8)
  • close (28-28)
  • rightPop (13-13)
src/Queue/Connection/Redis.php (3)
  • rightPopArray (63-72)
  • close (172-176)
  • rightPop (74-83)
src/Queue/Connection/RedisCluster.php (3)
  • rightPopArray (57-66)
  • close (169-173)
  • rightPop (68-77)
tests/Queue/servers/Workerman/worker.php (2)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-50)
  • workerStart (34-41)
  • workerStop (42-49)
src/Queue/Connection/Redis.php (1)
  • Redis (7-190)
src/Queue/Connection/RedisCluster.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Redis.php (1)
  • close (103-107)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Server.php (6)
src/Queue/Job.php (2)
  • hook (24-29)
  • getHook (36-39)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (100-104)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Adapter.php (6)
  • Adapter (5-43)
  • __construct (12-16)
  • start (22-22)
  • stop (28-28)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Server.php (5)
  • __construct (84-88)
  • start (266-408)
  • stop (236-247)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Consumer.php (1)
  • close (27-27)
🪛 GitHub Actions: Linter
src/Queue/Broker/Redis.php

[error] 1-1: PSR-12 braces style violation detected by Pint. Please fix braces in this file. Run 'vendor/bin/pint' to auto-fix if available.

🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

88-88: Avoid unused local variables such as '$process'. (undefined)

(UnusedLocalVariable)

🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php

13-13: Class constants with native types are supported only on PHP 8.3 and later.

(classConstant.nativeTypeNotSupported)

🔇 Additional comments (30)
src/Queue/Queue.php (1)

11-13: LGTM!

The quote style change is consistent and the validation logic remains correct.

tests/Queue/servers/AMQP/Dockerfile (1)

1-3: LGTM!

The Dockerfile is correctly structured with required build dependencies for PHP extensions.

tests/Queue/servers/Workerman/Dockerfile (1)

1-11: LGTM!

The Dockerfile properly configures the pcntl extension required for Workerman process control.

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM!

The close() implementation correctly handles the nullable Redis instance and resets the state to allow potential reconnection through getRedis().

src/Queue/Connection/Redis.php (1)

172-176: LGTM!

Clean implementation of close() that properly releases the Redis connection and resets state for potential reconnection.

src/Queue/Broker/Redis.php (4)

27-36: Good error handling for graceful shutdown.

The try/catch properly distinguishes between shutdown-induced exceptions (break the loop) and unexpected Redis failures (rethrow). This ensures the worker can exit cleanly when close() is called while blocked on rightPopArray.


103-107: LGTM!

Calling $this->connection->close() after setting the closed flag ensures proper resource cleanup and complements the graceful shutdown logic in the consume loop.


130-130: LGTM!

Using the POP_TIMEOUT constant here maintains consistency with the consume method and eliminates the magic number.


13-14: No issue to address. The project's minimum PHP version is 8.3, which fully supports native types on class constants. The implementation is compatible with the declared requirements.

src/Queue/Broker/Pool.php (2)

33-40: LGTM!

The multiline formatting improves readability of the method signature.


47-65: LGTM!

The reformatted closures with trailing commas follow modern PHP style conventions.

src/Queue/Connection.php (1)

28-28: Good addition for graceful shutdown support.

Adding close() to the interface enables consistent resource cleanup across all connection types. All known implementations (Redis, RedisCluster) properly implement this method. Note this is a breaking change for any external implementations of this interface.

tests/Queue/servers/Workerman/worker.php (4)

7-9: LGTM: Import updates align with refactored API.

The imports correctly reflect the new class structure with Redis connection, Redis broker, and Workerman adapter.


15-15: LGTM: Job handler updated to fluent API.

The use of inject('message')->action(handleRequest(...)) is cleaner and consistent with the new Server API.


24-26: LGTM: Worker start hook properly registered.

The inline format is clean and the lifecycle hook is correctly wired.


28-30: LGTM: Worker stop hook added for lifecycle management.

This properly demonstrates the new workerStop API and aligns with the broader worker lifecycle refactoring.

tests/Queue/servers/AMQP/worker.php (5)

6-8: LGTM: Import statements updated correctly.

The direct imports for AMQP, Swoole, and Server are clean and follow standard conventions.


10-12: LGTM: Component instantiation follows updated API.

The AMQP broker, Swoole adapter, and Server are correctly instantiated with appropriate parameters.


14-14: LGTM: Job handler uses fluent API.

The handler correctly uses the inject/action pattern consistent with other test files.


23-25: LGTM: Worker start hook correctly implemented.

The inline format and single-quoted string are consistent with coding standards.


27-29: LGTM: Worker stop hook added.

This correctly demonstrates the new workerStop lifecycle hook for the AMQP/Swoole configuration.

src/Queue/Server.php (4)

33-63: LGTM: Worker lifecycle hooks refactored to arrays.

Converting from single typed properties to arrays with default initialization (= []) properly resolves the previous concerns about uninitialized properties and enables multiple hook registration.


156-165: LGTM: setResource signature updated to multi-line format.

The formatting improves readability while maintaining the same functionality.


174-217: LGTM: Telemetry histogram buckets reformatted.

The multi-line format for ExplicitBucketBoundaries improves readability without changing functionality.


377-398: LGTM: Worker stop handling is robust.

The implementation correctly:

  • Iterates over all registered workerStop hooks
  • Isolates hook failures with try/catch to prevent one hook from breaking others
  • Guarantees consumer connection closure via finally block
  • Provides clear error logging
src/Queue/Adapter/Swoole.php (5)

5-20: LGTM: Properties support manual worker management.

The new properties correctly support the architectural shift from pool-based to manual Process management:

  • $workers tracks spawned Process instances
  • $running controls the supervisor loop
  • $onWorkerStart and $onWorkerStop hold lifecycle callbacks

32-53: LGTM: Supervisor loop properly implements worker orchestration.

The implementation correctly:

  • Spawns workers before entering the supervisor loop
  • Enables Swoole coroutine hooks for I/O
  • Registers signal handlers in the master process context
  • Maintains a supervisor loop that monitors worker state
  • Uses SIGCHLD to trigger reaping of finished workers

55-76: LGTM: Worker lifecycle correctly managed.

The spawnWorker implementation properly:

  • Runs worker callbacks inside a coroutine context for async support
  • Registers SIGTERM handler in the worker process to gracefully close the consumer
  • Invokes onWorkerStart hooks within the coroutine (line 63-65)
  • Invokes onWorkerStop hooks after the coroutine exits (line 69-71), ensuring proper cleanup order
  • Tracks the spawned process for supervisor management

78-92: LGTM: Shutdown handling is correct.

The reap() method properly cleans up finished workers using Process::wait(), and stop() correctly signals all workers to terminate gracefully via SIGTERM.

Note: The static analysis warning about unused $process on line 88 is a false positive — the iteration is necessary to obtain $pid keys for signaling.


94-104: LGTM: Lifecycle callback registration follows consistent pattern.

The workerStart and workerStop methods correctly append callbacks to their respective arrays and return $this for fluent chaining, consistent with the adapter interface.

Comment on lines 42 to 45
public function close(): void
{
$this->delegateConsumer(__FUNCTION__, \func_get_args());
// TODO: Implement closing all connections in the pool
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incomplete close() implementation may cause resource leaks.

The close() method currently does nothing, while other broker implementations (Redis, AMQP) properly clean up resources. If Pool is actively used, connections may not be released on shutdown.

🔎 Suggested implementation
 public function close(): void
 {
-    // TODO: Implement closing all connections in the pool
+    // Close all connections in the publisher pool
+    $this->publisher?->reclaim();
+    
+    // Close all connections in the consumer pool
+    $this->consumer?->reclaim();
 }

Note: The exact implementation depends on the UtopiaPool API. Would you like me to investigate how connections should be closed in a pool context?

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/Queue/Broker/Pool.php around lines 42 to 45, the close() method is
currently empty and must explicitly release all pooled connections to avoid
leaks; implement it to iterate over the underlying UtopiaPool entries (or use
its provided drain/clear/destroy API), call the appropriate connection
close/destroy method or release function for each resource, catch and log any
exceptions to avoid halting shutdown, and then mark the pool as closed/cleared
so subsequent calls are no-ops; ensure the implementation respects the
UtopiaPool lifecycle API (drain/clear/destroy) and updates any internal state
flags.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/Queue/Broker/Redis.php (1)

1-1: Fix PSR-12 formatting issue.

The pipeline reports a braces style violation. Run Pint to automatically fix the formatting:

#!/bin/bash
vendor/bin/pint src/Queue/Broker/Redis.php
♻️ Duplicate comments (4)
src/Queue/Broker/AMQP.php (1)

134-134: Verify that AMQPChannel::stopConsume() exists in php-amqplib.

A previous review flagged that stopConsume() may not exist on AMQPChannel in php-amqplib, which would cause a fatal error. If the method doesn't exist, you should either call basic_cancel() for each active consumer tag (iterating $this->channel->callbacks) or simply close the channel/connection directly without calling stopConsume().

Please search the web to confirm whether AMQPChannel::stopConsume() exists in php-amqplib:

php-amqplib AMQPChannel stopConsume method documentation
src/Queue/Server.php (3)

106-113: Undefined index access on 'reset' when resource callback doesn't exist.

Line 109 accesses self::$resourcesCallbacks[$name]['reset'] before verifying that $name exists in resourcesCallbacks (checked on line 111). This will throw an undefined index notice.

🔎 Proposed fix
-        if (
-            !\array_key_exists($name, $this->resources) ||
-            $fresh ||
-            self::$resourcesCallbacks[$name]['reset']
-        ) {
+        if (
+            !\array_key_exists($name, $this->resources)
+            || $fresh
+            || (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
+        ) {

304-315: Group init hooks run unconditionally while global hooks are gated by getHook().

Global init hooks (lines 291-302) are gated by $this->job->getHook(), but group init hooks (lines 304-315) run unconditionally. This inconsistency may lead to unexpected behavior.

🔎 Proposed fix for consistency
-                            foreach ($this->job->getGroups() as $group) {
-                                foreach ($this->initHooks as $hook) {
-                                    // Group init hooks
-                                    if (\in_array($group, $hook->getGroups())) {
-                                        $arguments = $this->getArguments(
-                                            $hook,
-                                            $message->getPayload(),
-                                        );
-                                        $hook->getAction()(...$arguments);
-                                    }
-                                }
-                            }
+                            if ($this->job->getHook()) {
+                                foreach ($this->job->getGroups() as $group) {
+                                    foreach ($this->initHooks as $hook) {
+                                        // Group init hooks
+                                        if (\in_array($group, $hook->getGroups())) {
+                                            $arguments = $this->getArguments(
+                                                $hook,
+                                                $message->getPayload(),
+                                            );
+                                            $hook->getAction()(...$arguments);
+                                        }
+                                    }
+                                }
+                            }

333-355: Comments incorrectly reference "init hooks" in shutdown callback; group shutdown hooks not gated.

Line 333 says "Global init hooks" and line 346 says "Group init hooks", but both are in the success callback which runs shutdown hooks. Additionally, group shutdown hooks (lines 344-355) are not gated by getHook() while global hooks are.

🔎 Proposed fix
                        if ($this->job->getHook()) {
                            foreach ($this->shutdownHooks as $hook) {
-                               // Global init hooks
+                               // Global shutdown hooks
                                if (\in_array('*', $hook->getGroups())) {
                                    $arguments = $this->getArguments(
                                        $hook,
                                        $message->getPayload(),
                                    );
                                    $hook->getAction()(...$arguments);
                                }
                            }
                        }

-                       foreach ($this->job->getGroups() as $group) {
-                           foreach ($this->shutdownHooks as $hook) {
-                               // Group init hooks
-                               if (\in_array($group, $hook->getGroups())) {
+                       if ($this->job->getHook()) {
+                           foreach ($this->job->getGroups() as $group) {
+                               foreach ($this->shutdownHooks as $hook) {
+                                   // Group shutdown hooks
+                                   if (\in_array($group, $hook->getGroups())) {
                                        $arguments = $this->getArguments(
                                            $hook,
                                            $message->getPayload(),
                                        );
                                        $hook->getAction()(...$arguments);
+                                   }
                                }
                            }
                        }
🧹 Nitpick comments (2)
src/Queue/Adapter/Swoole.php (1)

81-87: Silence unused variable warning in stop method.

The $process variable on line 83 is unused. Prefix it with an underscore to indicate it's intentionally unused.

🔎 Proposed fix
 public function stop(): self
 {
-    foreach ($this->workers as $pid => $process) {
+    foreach ($this->workers as $pid => $_process) {
         Process::kill($pid, SIGTERM);
     }
     return $this;
 }
src/Queue/Server.php (1)

422-450: Minor documentation wording.

The PHPDoc comments have minor grammatical issues:

  • Line 423: "Worker starts hooks" → "worker start hooks"
  • Line 444: "Worker stops hooks" → "worker stop hooks"
🔎 Proposed fix
     /**
-     * Returns Worker starts hooks.
+     * Returns the worker start hooks.
      * @return array
      */
     public function getWorkerStart(): array
     {
         return $this->workerStartHooks;
     }

     ...

-    /**
-     * Returns Worker stops hooks.
-     * @return array
-     */
+    /**
+     * Returns the worker stop hooks.
+     * @return array
+     */
     public function getWorkerStop(): array
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1db31ba and f0982fc.

📒 Files selected for processing (20)
  • pint.json
  • src/Queue/Adapter.php
  • src/Queue/Adapter/Swoole.php
  • src/Queue/Adapter/Workerman.php
  • src/Queue/Broker/AMQP.php
  • src/Queue/Broker/Pool.php
  • src/Queue/Broker/Redis.php
  • src/Queue/Connection.php
  • src/Queue/Connection/Redis.php
  • src/Queue/Connection/RedisCluster.php
  • src/Queue/Queue.php
  • src/Queue/Server.php
  • tests/Queue/servers/AMQP/Dockerfile
  • tests/Queue/servers/AMQP/worker.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/Swoole/worker.php
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
  • tests/Queue/servers/Workerman/Dockerfile
  • tests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
  • src/Queue/Adapter.php
  • src/Queue/Adapter/Workerman.php
✅ Files skipped from review due to trivial changes (3)
  • src/Queue/Queue.php
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/AMQP/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (5)
  • pint.json
  • tests/Queue/servers/Workerman/Dockerfile
  • tests/Queue/servers/Workerman/worker.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/Swoole/worker.php
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (103-107)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Adapter.php (6)
  • Adapter (5-43)
  • __construct (12-16)
  • start (22-22)
  • stop (28-28)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Server.php (5)
  • __construct (84-88)
  • start (266-408)
  • stop (236-247)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/Pool.php (4)
src/Queue/Queue.php (1)
  • Queue (5-15)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (103-107)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Connection/Redis.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (103-107)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
tests/Queue/servers/AMQP/worker.php (2)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (10-100)
  • workerStart (89-93)
  • workerStop (95-99)
src/Queue/Server.php (3)
  • Server (14-537)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Server.php (10)
src/Queue/Job.php (2)
  • hook (24-29)
  • getHook (36-39)
src/Queue/Broker/AMQP.php (2)
  • consume (75-130)
  • close (132-136)
src/Queue/Broker/Pool.php (2)
  • consume (33-40)
  • close (42-45)
src/Queue/Broker/Redis.php (2)
  • consume (21-101)
  • close (103-107)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Message.php (1)
  • Message (5-81)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (95-99)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Redis.php (3)
src/Queue/Connection.php (3)
  • rightPopArray (8-8)
  • close (28-28)
  • rightPop (13-13)
src/Queue/Connection/Redis.php (3)
  • rightPopArray (63-72)
  • close (172-176)
  • rightPop (74-83)
src/Queue/Connection/RedisCluster.php (3)
  • rightPopArray (57-66)
  • close (169-173)
  • rightPop (68-77)
🪛 GitHub Actions: Linter
src/Queue/Broker/Redis.php

[error] 1-1: PSR-12: braces style issue detected by Pint lint. Braces formatting violation found in this file.

🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

83-83: Avoid unused local variables such as '$process'. (undefined)

(UnusedLocalVariable)

🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php

13-13: Class constants with native types are supported only on PHP 8.3 and later.

(classConstant.nativeTypeNotSupported)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Adapter Tests (AMQP)
  • GitHub Check: Adapter Tests (SwooleRedisCluster)
  • GitHub Check: Adapter Tests (Workerman)
🔇 Additional comments (12)
src/Queue/Connection.php (1)

28-28: LGTM! Essential interface addition for resource cleanup.

The close() method enables explicit connection teardown across all connection implementations, supporting graceful shutdown patterns throughout the queue system.

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM! Clean implementation of connection cleanup.

The implementation correctly uses the null-safe operator and nullifies the internal reference, mirroring the pattern in Redis.php.

src/Queue/Connection/Redis.php (1)

172-176: LGTM! Clean and consistent implementation.

The implementation correctly handles connection cleanup using the null-safe operator and reference nullification, consistent with the RedisCluster implementation.

src/Queue/Broker/Redis.php (4)

27-36: LGTM! Excellent error handling for graceful shutdown.

The try/catch block correctly distinguishes between shutdown-related exceptions (which should break the loop) and other Redis errors (which should propagate), enabling clean worker termination.


105-106: LGTM! Proper resource cleanup on shutdown.

The close implementation correctly signals shutdown and delegates connection cleanup to the underlying connection.


130-130: Verify the timeout reduction from 5 to 2 seconds.

The retry operation now uses POP_TIMEOUT (2 seconds) instead of the previous hardcoded value of 5 seconds. This reduces the wait time for failed jobs but may impact retry behavior under certain Redis loads.


13-13: No action required. The typed class constant is compatible with the project's minimum PHP version of 8.3, as specified in composer.json.

Likely an incorrect or invalid review comment.

src/Queue/Adapter/Swoole.php (2)

31-50: LGTM! Well-structured master process orchestration.

The start method correctly:

  • Spawns all workers upfront
  • Sets up master-level signal handlers for SIGTERM, SIGINT, and SIGCHLD
  • Waits for all workers to exit before returning

The architecture properly separates master-level concerns (signal handling, worker lifecycle) from worker-level concerns.


74-79: LGTM! Proper zombie process cleanup.

The reap method correctly uses non-blocking Process::wait to clean up exited workers and remove them from the tracking array.

src/Queue/Server.php (3)

51-63: LGTM! Clean transition from single hooks to hook arrays.

The introduction of workerStartHooks and workerStopHooks arrays with proper initialization and PHPDoc annotations enables multiple lifecycle hooks per event.


377-398: Well-structured workerStop with robust error handling.

Good implementation:

  • Individual hook exceptions are caught and logged without preventing other hooks from running
  • finally block ensures consumer->close() is always called, even if hooks throw
  • Proper cleanup flow for graceful shutdown

360-373: Proper null-safety in error callback.

Good use of null-safe operator ?-> for $message?->getPid() to handle cases where the message might be null during error conditions.

Comment on lines +52 to 72
protected function spawnWorker(int $workerId): void
{
$this->pool->shutdown();
return $this;
$process = new Process(function () use ($workerId) {
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);

Coroutine\run(function () use ($workerId) {
Process::signal(SIGTERM, fn () => $this->consumer->close());

foreach ($this->onWorkerStart as $callback) {
$callback((string)$workerId);
}

foreach ($this->onWorkerStop as $callback) {
$callback((string)$workerId);
}
});
}, false, 0, false);

$pid = $process->start();
$this->workers[$pid] = $process;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Worker exits immediately without processing jobs.

The worker process invokes onWorkerStop callbacks immediately after onWorkerStart (lines 64-66), causing the worker to exit before the consume loop can run. The Server wires the consumer loop via workerStart, but the coroutine exits before that loop ever blocks.

Expected flow:

  1. onWorkerStart callbacks execute (including Server's consume loop)
  2. Consume loop blocks, processing jobs
  3. On SIGTERM, consumer->close() breaks the loop
  4. onWorkerStop callbacks execute
  5. Worker exits

Fix: Remove lines 64-66. The onWorkerStop callbacks should be invoked after the worker's main work completes (e.g., in a defer or after the consume loop returns), not immediately after start.

🔎 Proposed fix
 protected function spawnWorker(int $workerId): void
 {
     $process = new Process(function () use ($workerId) {
         Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
 
         Coroutine\run(function () use ($workerId) {
             Process::signal(SIGTERM, fn () => $this->consumer->close());
 
+            // Register deferred cleanup
+            defer(function () use ($workerId) {
+                foreach ($this->onWorkerStop as $callback) {
+                    $callback((string)$workerId);
+                }
+            });
+
             foreach ($this->onWorkerStart as $callback) {
                 $callback((string)$workerId);
             }
-
-            foreach ($this->onWorkerStop as $callback) {
-                $callback((string)$workerId);
-            }
         });
     }, false, 0, false);
 
     $pid = $process->start();
     $this->workers[$pid] = $process;
 }
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 52 to 72, the onWorkerStop
callbacks are being invoked immediately after onWorkerStart (lines 64–66),
causing the worker coroutine to exit before the server's consume loop runs;
remove the immediate invocation of onWorkerStop there and instead call those
callbacks only after the worker's main work completes (for example in a deferred
cleanup block or right after the consume loop returns), keeping the SIGTERM
handler that calls $this->consumer->close() and ensuring callbacks are invoked
with the same (string)$workerId.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (6)
src/Queue/Broker/AMQP.php (1)

134-134: stopConsume() method does not exist in php-amqplib.

The project uses php-amqplib version 3.7.0. The AMQPChannel class in php-amqplib does not provide a stopConsume() method. This call will result in a fatal error at runtime.

To stop consumption, you need to cancel active consumer tags using basic_cancel() for each registered consumer, or simply close the channel directly if consumer tags are not tracked.

Does php-amqplib AMQPChannel class have a stopConsume method in version 3.7.0?
🔎 Suggested fix

If you don't track consumer tags, simply remove the stopConsume() call and let the channel close handle cleanup:

 public function close(): void
 {
-    $this->channel?->stopConsume();
     $this->channel?->getConnection()?->close();
 }

Alternatively, if you need to explicitly cancel consumers, track consumer tags and cancel them:

 public function close(): void
 {
+    // Cancel all active consumers if tags are tracked
+    foreach ($this->channel?->callbacks ?? [] as $consumerTag => $_) {
+        $this->channel->basic_cancel($consumerTag);
+    }
     $this->channel?->getConnection()?->close();
 }
tests/Queue/servers/Workerman/worker.php (1)

12-12: Fix queue name typo: 'wokerman' should be 'workerman'.

The queue name contains a typo and should be 'workerman' for consistency with the adapter name.

🔎 Proposed fix
-$adapter = new Workerman($consumer, 12, 'wokerman');
+$adapter = new Workerman($consumer, 12, 'workerman');
src/Queue/Broker/Pool.php (1)

42-44: Implement close() to prevent resource leaks.

The close() method remains unimplemented with a TODO. Unlike other broker implementations (Redis, AMQP) that properly clean up resources, this may cause connection leaks when the pool is not explicitly drained.

Based on the UtopiaPool API, you likely need to call a drain/reclaim method on both publisher and consumer pools. Would you like me to generate a script to verify the UtopiaPool API for proper cleanup methods?

src/Queue/Adapter/Swoole.php (1)

52-72: Critical: Worker coroutine exits immediately, consume loop never runs.

Lines 64-66 invoke onWorkerStop callbacks immediately after onWorkerStart completes, causing the worker coroutine to exit before the consume loop (registered via workerStart in Server.php line 269) can block and process jobs.

Expected flow:

  1. onWorkerStart callbacks execute (Server registers consume loop here)
  2. Consume loop blocks, processing messages
  3. On SIGTERM, consumer->close() breaks the loop
  4. onWorkerStop callbacks execute
  5. Coroutine exits

Current flow:

  1. onWorkerStart callbacks execute
  2. onWorkerStop callbacks execute immediately
  3. Coroutine exits (consume loop never blocks)

Fix: Remove lines 64-66. The onWorkerStop callbacks should execute after the consume loop naturally returns (when the consumer is closed), not immediately after start. Use defer() or move the invocation to after the consume loop completes.

🔎 Proposed fix using defer
 protected function spawnWorker(int $workerId): void
 {
     $process = new Process(function () use ($workerId) {
         Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
 
         Coroutine\run(function () use ($workerId) {
             Process::signal(SIGTERM, fn () => $this->consumer->close());
 
+            // Ensure stop callbacks run after consume loop exits
+            defer(function () use ($workerId) {
+                foreach ($this->onWorkerStop as $callback) {
+                    $callback((string)$workerId);
+                }
+            });
+
             foreach ($this->onWorkerStart as $callback) {
                 $callback((string)$workerId);
             }
-
-            foreach ($this->onWorkerStop as $callback) {
-                $callback((string)$workerId);
-            }
         });
     }, false, 0, false);
 
     $pid = $process->start();
     $this->workers[$pid] = $process;
 }
src/Queue/Server.php (2)

304-315: Group hooks ignore the hook selector flag.

Global hooks respect $this->job->getHook() (lines 291, 331), but group hooks (lines 304-315 for init, 344-355 for shutdown) run unconditionally when the group matches. This creates inconsistent behavior where setting hook(false) disables global hooks but not group hooks.

For consistency, gate group hooks the same way:

🔎 Proposed fix
-                        foreach ($this->job->getGroups() as $group) {
-                            foreach ($this->initHooks as $hook) {
-                                // Group init hooks
-                                if (\in_array($group, $hook->getGroups())) {
-                                    $arguments = $this->getArguments(
-                                        $hook,
-                                        $message->getPayload(),
-                                    );
-                                    $hook->getAction()(...$arguments);
+                        if ($this->job->getHook()) {
+                            foreach ($this->job->getGroups() as $group) {
+                                foreach ($this->initHooks as $hook) {
+                                    // Group init hooks
+                                    if (\in_array($group, $hook->getGroups())) {
+                                        $arguments = $this->getArguments(
+                                            $hook,
+                                            $message->getPayload(),
+                                        );
+                                        $hook->getAction()(...$arguments);
+                                    }
                                 }
                             }
                         }

Apply the same pattern to shutdown group hooks (lines 344-355).

Also applies to: 344-355


106-110: Guard against undefined index when accessing callback reset flag.

Line 109 accesses self::$resourcesCallbacks[$name]['reset'] before verifying the callback exists (checked at line 111). If a resource exists in $this->resources but has no registered callback (e.g., 'error' initialized at line 69), this will trigger an undefined index error.

Short-circuit evaluation doesn't prevent this: When the first two conditions are false, PHP evaluates the third condition before entering the block that checks callback existence.

🔎 Proposed fix
 if (
     !\array_key_exists($name, $this->resources) ||
     $fresh ||
-    self::$resourcesCallbacks[$name]['reset']
+    (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
 ) {
     if (!\array_key_exists($name, self::$resourcesCallbacks)) {
         throw new Exception("Failed to find resource: $name");
     }
🧹 Nitpick comments (1)
src/Queue/Adapter/Swoole.php (1)

81-87: Unused $process variable is a false positive.

PHPMD flags $process as unused on line 83, but it's required for the foreach loop to iterate over worker PIDs. The actual Process instance isn't needed, only the $pid keys. This is a common pattern and can be safely ignored.

If you prefer to silence the linter, you can use only the keys:

 public function stop(): self
 {
-    foreach ($this->workers as $pid => $process) {
+    foreach (array_keys($this->workers) as $pid) {
         Process::kill($pid, SIGTERM);
     }
     return $this;
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0982fc and 5e70837.

📒 Files selected for processing (20)
  • pint.json
  • src/Queue/Adapter.php
  • src/Queue/Adapter/Swoole.php
  • src/Queue/Adapter/Workerman.php
  • src/Queue/Broker/AMQP.php
  • src/Queue/Broker/Pool.php
  • src/Queue/Broker/Redis.php
  • src/Queue/Connection.php
  • src/Queue/Connection/Redis.php
  • src/Queue/Connection/RedisCluster.php
  • src/Queue/Queue.php
  • src/Queue/Server.php
  • tests/Queue/servers/AMQP/Dockerfile
  • tests/Queue/servers/AMQP/worker.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/Swoole/worker.php
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
  • tests/Queue/servers/Workerman/Dockerfile
  • tests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
  • src/Queue/Adapter/Workerman.php
  • src/Queue/Adapter.php
🚧 Files skipped from review as they are similar to previous changes (7)
  • tests/Queue/servers/Workerman/Dockerfile
  • tests/Queue/servers/AMQP/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
  • tests/Queue/servers/Swoole/worker.php
  • src/Queue/Connection/Redis.php
  • src/Queue/Queue.php
🧰 Additional context used
🧬 Code graph analysis (6)
src/Queue/Broker/Redis.php (3)
src/Queue/Connection.php (2)
  • rightPopArray (8-8)
  • rightPop (13-13)
src/Queue/Connection/Redis.php (2)
  • rightPopArray (63-72)
  • rightPop (74-83)
src/Queue/Connection/RedisCluster.php (2)
  • rightPopArray (57-66)
  • rightPop (68-77)
src/Queue/Server.php (7)
src/Queue/Job.php (2)
  • hook (24-29)
  • getHook (36-39)
src/Queue/Broker/AMQP.php (2)
  • consume (75-130)
  • close (132-136)
src/Queue/Broker/Pool.php (2)
  • consume (33-40)
  • close (42-45)
src/Queue/Broker/Redis.php (2)
  • consume (21-101)
  • close (103-106)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
src/Queue/Connection.php (1)
  • close (28-28)
tests/Queue/servers/Workerman/worker.php (3)
src/Queue/Adapter.php (3)
  • Adapter (5-43)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-50)
  • workerStart (34-41)
  • workerStop (42-49)
src/Queue/Server.php (3)
  • Server (14-537)
  • workerStart (414-420)
  • workerStop (435-441)
tests/Queue/servers/AMQP/worker.php (3)
src/Queue/Adapter.php (3)
  • Adapter (5-43)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (10-100)
  • workerStart (89-93)
  • workerStop (95-99)
src/Queue/Server.php (3)
  • Server (14-537)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Connection/RedisCluster.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Connection.php (5)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (103-106)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
🪛 GitHub Actions: Linter
src/Queue/Broker/Redis.php

[error] 1-1: Pint lint failed due to a PSR-12 braces issue. Command 'vendor/bin/pint --test' returned exit code 1.

🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

83-83: Avoid unused local variables such as '$process'. (undefined)

(UnusedLocalVariable)

🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php

13-13: Class constants with native types are supported only on PHP 8.3 and later.

(classConstant.nativeTypeNotSupported)

🔇 Additional comments (12)
pint.json (1)

2-5: LGTM! Good addition of single quote enforcement.

The Pint configuration correctly adds the single_quote rule while maintaining the PSR-12 preset. The trailing comma after "psr12" follows best practices for easier future additions. The codebase properly follows the rule, using single quotes for literal strings while preserving double quotes for strings with variable interpolation.

tests/Queue/servers/Swoole/Dockerfile (1)

5-5: LGTM! EOF newline added.

This formatting change adds a trailing newline, aligning with standard file conventions.

src/Queue/Connection.php (1)

28-28: LGTM! Explicit resource cleanup added to interface.

The addition of close(): void to the Connection interface enables explicit resource teardown across implementations, supporting the graceful shutdown lifecycle introduced in this PR.

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM! Clean resource teardown implementation.

The close() implementation correctly closes the underlying RedisCluster connection and nullifies the reference, consistent with the pattern used in the Redis connection class.

src/Queue/Broker/AMQP.php (1)

114-114: Minor: String literal style changes are cosmetic.

The changes from double quotes to single quotes for static strings are stylistic improvements with no functional impact.

Also applies to: 165-165

tests/Queue/servers/AMQP/worker.php (2)

6-8: LGTM! Cleaner imports and initialization.

The refactoring to use explicit imports and direct class references improves readability.

Also applies to: 10-12


14-14: LGTM! Properly adopts new lifecycle API.

The worker now correctly uses the new workerStart() and workerStop() hooks introduced in this PR, along with the updated job action syntax.

Also applies to: 23-29

tests/Queue/servers/Workerman/worker.php (1)

7-11: LGTM! Adopts new lifecycle API.

The worker correctly uses the new Redis broker wiring and lifecycle hooks (workerStart() and workerStop()).

Also applies to: 15-15, 24-30

src/Queue/Broker/Pool.php (1)

33-38: LGTM! Formatting improvements enhance readability.

The multiline parameter lists and closure formatting improve code readability with no functional changes.

Also applies to: 49-52, 59-62

src/Queue/Broker/Redis.php (2)

13-13: No action required. The project's composer.json specifies php: >=8.3, which fully supports typed constants. The code is compatible with all supported PHP versions in this project.

Likely an incorrect or invalid review comment.


21-40: API signature change verified and correctly propagated across codebase.

The addition of successCallback and errorCallback parameters to the consume() method has been consistently implemented across all broker implementations (Redis, AMQP, and Pool) and the Consumer interface. The call site in Server.php line 277 correctly passes all four parameters. Exception handling in the Redis broker appropriately breaks on close and rethrows otherwise.

src/Queue/Server.php (1)

377-398: Excellent error handling in worker stop sequence.

The try/catch/finally structure ensures:

  • Individual hook failures are logged but don't prevent other hooks from running
  • Consumer cleanup (consumer->close()) always executes, even if hooks throw
  • Worker shutdown completes gracefully in all scenarios

This defensive pattern prevents hook errors from leaving connections open or preventing graceful shutdown.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
src/Queue/Server.php (2)

330-355: Misleading comments in success callback.

Lines 333 and 346 have comments referring to "init hooks" but this is the success callback which runs shutdown hooks. These should say "Global shutdown hooks" and "Group shutdown hooks" respectively.

🔎 Proposed fix
                     if ($this->job->getHook()) {
                         foreach ($this->shutdownHooks as $hook) {
-                            // Global init hooks
+                            // Global shutdown hooks
                             if (\in_array('*', $hook->getGroups())) {
                                 $arguments = $this->getArguments(
                                     $hook,
                                     $message->getPayload(),
                                 );
                                 $hook->getAction()(...$arguments);
                             }
                         }
                     }

                     foreach ($this->job->getGroups() as $group) {
                         foreach ($this->shutdownHooks as $hook) {
-                            // Group init hooks
+                            // Group shutdown hooks
                             if (\in_array($group, $hook->getGroups())) {

106-110: Potential undefined index on 'reset' access.

The condition accesses self::$resourcesCallbacks[$name]['reset'] without first verifying that self::$resourcesCallbacks[$name] exists. If a resource is set directly in $this->resources but no callback is registered, this will throw an undefined index notice.

🔎 Proposed fix
     public function getResource(string $name, bool $fresh = false): mixed
     {
         if (
             !\array_key_exists($name, $this->resources) ||
             $fresh ||
-            self::$resourcesCallbacks[$name]['reset']
+            (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
         ) {
src/Queue/Adapter/Swoole.php (1)

52-72: Critical: Worker exits immediately without processing jobs.

The onWorkerStop callbacks are invoked immediately after onWorkerStart (lines 64-66), causing the worker to exit before the consume loop ever runs. The Server wires the consumer loop via workerStart, but the coroutine exits before that loop blocks.

Expected flow:

  1. onWorkerStart callbacks execute (including Server's consume loop)
  2. Consume loop blocks, processing jobs
  3. On SIGTERM, consumer->close() breaks the loop
  4. onWorkerStop callbacks execute
  5. Worker exits

The onWorkerStop callbacks should only run after the worker's main work completes, not immediately after start.

🔎 Proposed fix using defer
 protected function spawnWorker(int $workerId): void
 {
     $process = new Process(function () use ($workerId) {
         Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);

         Coroutine\run(function () use ($workerId) {
             Process::signal(SIGTERM, fn () => $this->consumer->close());

+            // Register deferred cleanup - runs when coroutine completes
+            defer(function () use ($workerId) {
+                foreach ($this->onWorkerStop as $callback) {
+                    $callback((string)$workerId);
+                }
+            });
+
             foreach ($this->onWorkerStart as $callback) {
                 $callback((string)$workerId);
             }
-
-            foreach ($this->onWorkerStop as $callback) {
-                $callback((string)$workerId);
-            }
         });
     }, false, 0, false);

     $pid = $process->start();
     $this->workers[$pid] = $process;
 }
🧹 Nitpick comments (1)
src/Queue/Adapter/Swoole.php (1)

81-87: Unused $process variable in loop.

The $process variable is assigned but never used. Consider iterating over keys only.

🔎 Proposed fix
 public function stop(): self
 {
-    foreach ($this->workers as $pid => $process) {
+    foreach (array_keys($this->workers) as $pid) {
         Process::kill($pid, SIGTERM);
     }
     return $this;
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e70837 and 83d6ac3.

📒 Files selected for processing (20)
  • pint.json
  • src/Queue/Adapter.php
  • src/Queue/Adapter/Swoole.php
  • src/Queue/Adapter/Workerman.php
  • src/Queue/Broker/AMQP.php
  • src/Queue/Broker/Pool.php
  • src/Queue/Broker/Redis.php
  • src/Queue/Connection.php
  • src/Queue/Connection/Redis.php
  • src/Queue/Connection/RedisCluster.php
  • src/Queue/Queue.php
  • src/Queue/Server.php
  • tests/Queue/servers/AMQP/Dockerfile
  • tests/Queue/servers/AMQP/worker.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/Swoole/worker.php
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
  • tests/Queue/servers/Workerman/Dockerfile
  • tests/Queue/servers/Workerman/worker.php
💤 Files with no reviewable changes (2)
  • src/Queue/Adapter.php
  • src/Queue/Adapter/Workerman.php
✅ Files skipped from review due to trivial changes (2)
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (9)
  • src/Queue/Connection.php
  • src/Queue/Queue.php
  • pint.json
  • src/Queue/Connection/RedisCluster.php
  • src/Queue/Broker/AMQP.php
  • tests/Queue/servers/Workerman/worker.php
  • src/Queue/Broker/Pool.php
  • src/Queue/Connection/Redis.php
  • tests/Queue/servers/AMQP/Dockerfile
🧰 Additional context used
🧬 Code graph analysis (4)
tests/Queue/servers/SwooleRedisCluster/worker.php (4)
src/Queue/Connection/RedisCluster.php (1)
  • RedisCluster (7-184)
src/Queue/Adapter.php (3)
  • Adapter (5-43)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (10-100)
  • workerStart (89-93)
  • workerStop (95-99)
src/Queue/Server.php (3)
  • Server (14-537)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Adapter.php (5)
  • Adapter (5-43)
  • start (22-22)
  • stop (28-28)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Server.php (4)
  • start (266-408)
  • stop (236-247)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Consumer.php (1)
  • close (27-27)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Broker/AMQP.php (1)
  • AMQP (20-226)
src/Queue/Adapter.php (3)
  • Adapter (5-43)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (10-100)
  • workerStart (89-93)
  • workerStop (95-99)
src/Queue/Server.php (3)
  • Server (14-537)
  • workerStart (414-420)
  • workerStop (435-441)
src/Queue/Server.php (3)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Connection.php (1)
  • close (28-28)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

83-83: Avoid unused local variables such as '$process'. (undefined)

(UnusedLocalVariable)

🪛 PHPStan (2.1.33)
src/Queue/Broker/Redis.php

13-13: Class constants with native types are supported only on PHP 8.3 and later.

(classConstant.nativeTypeNotSupported)

🔇 Additional comments (14)
tests/Queue/servers/Workerman/Dockerfile (1)

11-11: LGTM! EOF normalization.

The trailing newline addition standardizes the file format with no functional impact.

src/Queue/Broker/Redis.php (3)

27-35: LGTM!

The exception handling correctly distinguishes between shutdown-triggered exceptions (which should break the loop gracefully) and genuine Redis errors (which should propagate). This enables clean graceful shutdown behavior.


128-128: LGTM!

Consistent use of POP_TIMEOUT in retry aligns with the consume method and improves maintainability.


13-14: The private const int POP_TIMEOUT = 2; syntax is appropriate for this codebase. The utopia-php/queue library explicitly requires PHP 8.3+ in composer.json, making typed class constants fully supported and compatible. No changes needed.

tests/Queue/servers/SwooleRedisCluster/worker.php (2)

6-19: LGTM!

The inline construction pattern with RedisCluster connection, Swoole adapter, and Server is clean and follows the new public API design. The 12-worker configuration matches other test files.


21-36: LGTM!

The job wiring and lifecycle hooks correctly use the new fluent API pattern. The workerStart() and workerStop() hooks demonstrate the intended usage of the new lifecycle management.

tests/Queue/servers/Swoole/worker.php (1)

6-32: LGTM!

The test file correctly demonstrates the new API with single-node Redis connection. The structure is consistent with other test worker files and properly exercises the workerStart() and workerStop() lifecycle hooks.

tests/Queue/servers/AMQP/worker.php (1)

6-31: LGTM!

The AMQP test file follows the same pattern as other test workers. The named parameters for the AMQP constructor improve readability. The lifecycle hooks are wired correctly.

src/Queue/Adapter/Swoole.php (2)

31-50: LGTM!

The supervisor loop correctly manages worker lifecycle with signal handlers for graceful shutdown (SIGTERM/SIGINT) and worker reaping (SIGCHLD). The polling approach with Coroutine::sleep(1) is appropriate for a supervisor process.


89-99: LGTM!

The callback registration methods correctly append to their respective arrays and return $this for method chaining, matching the abstract Adapter interface contract.

src/Queue/Server.php (4)

51-63: LGTM!

The multi-hook arrays for workerStartHooks and workerStopHooks are properly declared and documented, enabling multiple lifecycle callbacks to be registered.


377-398: LGTM!

Excellent defensive coding. Each hook is wrapped in its own try/catch, preventing one failing hook from blocking others. The finally block ensures the consumer connection is always closed regardless of hook failures. This is the correct pattern for cleanup operations.


414-450: LGTM!

The multi-hook registration pattern is consistent across workerStart() and workerStop(). Both methods create a new Hook, add it to their respective arrays, and return the hook for fluent configuration. The getter methods correctly return the arrays for inspection.

Note: This is a breaking change from the previous single-hook API, but it aligns with the PR's objective to support multiple lifecycle hooks.


360-373: LGTM!

The error callback correctly handles the nullable Message parameter with null-safe operators ($message?->getPid()). The error resource is properly set and error hooks are invoked with their arguments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants