Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ services:
container_name: tests
build: .
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- swoole
- swoole-amqp
Expand All @@ -16,8 +17,9 @@ services:
build: ./tests/Queue/servers/Swoole/.
command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- redis

Expand All @@ -26,8 +28,9 @@ services:
build: ./tests/Queue/servers/SwooleRedisCluster/.
command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
redis-cluster-0:
condition: service_healthy
Expand All @@ -37,8 +40,9 @@ services:
build: ./tests/Queue/servers/AMQP/.
command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
amqp:
condition: service_healthy
Expand All @@ -48,8 +52,9 @@ services:
build: ./tests/Queue/servers/Workerman/.
command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- redis

Expand Down
5 changes: 4 additions & 1 deletion pint.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"preset": "psr12"
"preset": "psr12",
"rules": {
"single_quote": true
}
}
6 changes: 0 additions & 6 deletions src/Queue/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,4 @@ abstract public function workerStart(callable $callback): self;
* @return self
*/
abstract public function workerStop(callable $callback): self;

/**
* Returns the native server object from the Adapter.
* @return mixed
*/
abstract public function getNative(): mixed;
}
88 changes: 66 additions & 22 deletions src/Queue/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,99 @@

namespace Utopia\Queue\Adapter;

use Swoole\Process\Pool;
use Swoole\Coroutine;
use Swoole\Process;
use Utopia\Queue\Adapter;
use Utopia\Queue\Consumer;

class Swoole extends Adapter
{
protected Pool $pool;
/** @var Process[] */
protected array $workers = [];

public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
{
parent::__construct($workerNum, $queue, $namespace);
/** @var callable[] */
protected array $onWorkerStart = [];

/** @var callable[] */
protected array $onWorkerStop = [];

public function __construct(
Consumer $consumer,
int $workerNum,
string $queue,
string $namespace = 'utopia-queue',
) {
parent::__construct($workerNum, $queue, $namespace);
$this->consumer = $consumer;
$this->pool = new Pool($workerNum);
}

public function start(): self
{
$this->pool->set(['enable_coroutine' => true]);
$this->pool->start();
for ($i = 0; $i < $this->workerNum; $i++) {
$this->spawnWorker($i);
}

Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);

Coroutine\run(function () {
Process::signal(SIGTERM, fn () => $this->stop());
Process::signal(SIGINT, fn () => $this->stop());
Process::signal(SIGCHLD, fn () => $this->reap());

while (\count($this->workers) > 0) {
Coroutine::sleep(1);
}
});

return $this;
}

public function stop(): self
protected function spawnWorker(int $workerId): void
{
$this->pool->shutdown();
return $this;
$process = new Process(function () use ($workerId) {
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);

Coroutine\run(function () use ($workerId) {
Process::signal(SIGTERM, fn () => $this->consumer->close());

foreach ($this->onWorkerStart as $callback) {
$callback((string)$workerId);
}

foreach ($this->onWorkerStop as $callback) {
$callback((string)$workerId);
}
});
}, false, 0, false);

$pid = $process->start();
$this->workers[$pid] = $process;
}
Comment on lines +52 to 72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Worker exits immediately without processing jobs.

The worker process invokes onWorkerStop callbacks immediately after onWorkerStart (lines 64-66), causing the worker to exit before the consume loop can run. The Server wires the consumer loop via workerStart, but the coroutine exits before that loop ever blocks.

Expected flow:

  1. onWorkerStart callbacks execute (including Server's consume loop)
  2. Consume loop blocks, processing jobs
  3. On SIGTERM, consumer->close() breaks the loop
  4. onWorkerStop callbacks execute
  5. Worker exits

Fix: Remove lines 64-66. The onWorkerStop callbacks should be invoked after the worker's main work completes (e.g., in a defer or after the consume loop returns), not immediately after start.

🔎 Proposed fix
 protected function spawnWorker(int $workerId): void
 {
     $process = new Process(function () use ($workerId) {
         Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
 
         Coroutine\run(function () use ($workerId) {
             Process::signal(SIGTERM, fn () => $this->consumer->close());
 
+            // Register deferred cleanup
+            defer(function () use ($workerId) {
+                foreach ($this->onWorkerStop as $callback) {
+                    $callback((string)$workerId);
+                }
+            });
+
             foreach ($this->onWorkerStart as $callback) {
                 $callback((string)$workerId);
             }
-
-            foreach ($this->onWorkerStop as $callback) {
-                $callback((string)$workerId);
-            }
         });
     }, false, 0, false);
 
     $pid = $process->start();
     $this->workers[$pid] = $process;
 }
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 52 to 72, the onWorkerStop
callbacks are being invoked immediately after onWorkerStart (lines 64–66),
causing the worker coroutine to exit before the server's consume loop runs;
remove the immediate invocation of onWorkerStop there and instead call those
callbacks only after the worker's main work completes (for example in a deferred
cleanup block or right after the consume loop returns), keeping the SIGTERM
handler that calls $this->consumer->close() and ensuring callbacks are invoked
with the same (string)$workerId.


public function workerStart(callable $callback): self
protected function reap(): void
{
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
while (($ret = Process::wait(false)) !== false) {
unset($this->workers[$ret['pid']]);
}
}

public function stop(): self
{
foreach ($this->workers as $pid => $process) {
Process::kill($pid, SIGTERM);
}
return $this;
}

public function workerStop(callable $callback): self
public function workerStart(callable $callback): self
{
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
});

$this->onWorkerStart[] = $callback;
return $this;
}

public function getNative(): Pool
public function workerStop(callable $callback): self
{
return $this->pool;
$this->onWorkerStop[] = $callback;
return $this;
}
}
5 changes: 0 additions & 5 deletions src/Queue/Adapter/Workerman.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,4 @@ public function workerStop(callable $callback): self

return $this;
}

public function getNative(): Worker
{
return $this->worker;
}
}
5 changes: 3 additions & 2 deletions src/Queue/Broker/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));

// 2. Declare the working queue and configure the DLX for receiving rejected messages.
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"])));
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);

// 3. Declare the dead-letter-queue and bind it to the DLX.
Expand All @@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe

public function close(): void
{
$this->channel?->stopConsume();
$this->channel?->getConnection()?->close();
}

Expand Down Expand Up @@ -161,7 +162,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
{
$queueName = $queue->name;
if ($failedJobs) {
$queueName = $queueName . ".failed";
$queueName = $queueName . '.failed';
}

$client = new Client();
Expand Down
20 changes: 15 additions & 5 deletions src/Queue/Broker/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
return $this->delegatePublish(__FUNCTION__, \func_get_args());
}

public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
public function consume(
Queue $queue,
callable $messageCallback,
callable $successCallback,
callable $errorCallback,
): void {
$this->delegateConsumer(__FUNCTION__, \func_get_args());
}

public function close(): void
{
$this->delegateConsumer(__FUNCTION__, \func_get_args());
// TODO: Implement closing all connections in the pool
}
Comment on lines 42 to 45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incomplete close() implementation may cause resource leaks.

The close() method currently does nothing, while other broker implementations (Redis, AMQP) properly clean up resources. If Pool is actively used, connections may not be released on shutdown.

🔎 Suggested implementation
 public function close(): void
 {
-    // TODO: Implement closing all connections in the pool
+    // Close all connections in the publisher pool
+    $this->publisher?->reclaim();
+    
+    // Close all connections in the consumer pool
+    $this->consumer?->reclaim();
 }

Note: The exact implementation depends on the UtopiaPool API. Would you like me to investigate how connections should be closed in a pool context?

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/Queue/Broker/Pool.php around lines 42 to 45, the close() method is
currently empty and must explicitly release all pooled connections to avoid
leaks; implement it to iterate over the underlying UtopiaPool entries (or use
its provided drain/clear/destroy API), call the appropriate connection
close/destroy method or release function for each resource, catch and log any
exceptions to avoid halting shutdown, and then mark the pool as closed/cleared
so subsequent calls are no-ops; ensure the implementation respects the
UtopiaPool lifecycle API (drain/clear/destroy) and updates any internal state
flags.


protected function delegatePublish(string $method, array $args): mixed
{
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
return $this->publisher?->use(function (Publisher $adapter) use (
$method,
$args,
) {
return $adapter->$method(...$args);
});
}

protected function delegateConsumer(string $method, array $args): mixed
{
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
return $this->consumer?->use(function (Consumer $adapter) use (
$method,
$args,
) {
return $adapter->$method(...$args);
});
}
Expand Down
14 changes: 12 additions & 2 deletions src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

class Redis implements Publisher, Consumer
{
private const int POP_TIMEOUT = 2;

private bool $closed = false;

public function __construct(private readonly Connection $connection)
Expand All @@ -22,7 +24,15 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
/**
* Waiting for next Job.
*/
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", 5);
try {
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT);
} catch (\RedisException $e) {
if ($this->closed) {
break;
}

throw $e;
}

if (!$nextMessage) {
continue;
Expand Down Expand Up @@ -115,7 +125,7 @@ public function retry(Queue $queue, ?int $limit = null): void
$processed = 0;

while (true) {
$pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", 5);
$pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT);

// No more jobs to retry
if ($pid === false) {
Expand Down
1 change: 1 addition & 0 deletions src/Queue/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool;
public function increment(string $key): int;
public function decrement(string $key): int;
public function ping(): bool;
public function close(): void;
}
6 changes: 6 additions & 0 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ public function ping(): bool
}
}

public function close(): void
{
$this->redis?->close();
$this->redis = null;
}

protected function getRedis(): \Redis
{
if ($this->redis) {
Expand Down
6 changes: 6 additions & 0 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ public function ping(): bool
}
}

public function close(): void
{
$this->redis?->close();
$this->redis = null;
}

protected function getRedis(): \RedisCluster
{
if ($this->redis) {
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public function __construct(
public string $namespace = 'utopia-queue',
) {
if (empty($this->name)) {
throw new \InvalidArgumentException("Cannot create queue with empty name.");
throw new \InvalidArgumentException('Cannot create queue with empty name.');
}
}
}
Loading