From eed8b7cbecf5449af213ebdd73c4ef918a9070ee Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Sun, 25 May 2025 17:18:16 +0200 Subject: [PATCH 1/2] feat: add workerStop handling --- docker-compose.yml | 25 +-- src/Queue/Adapter/Swoole.php | 50 +++++- src/Queue/Broker/AMQP.php | 1 + src/Queue/Server.php | 157 ++++++++++-------- tests/Queue/servers/AMQP/Dockerfile | 4 +- tests/Queue/servers/AMQP/worker.php | 6 + tests/Queue/servers/Swoole/Dockerfile | 1 + tests/Queue/servers/Swoole/worker.php | 6 + .../servers/SwooleRedisCluster/Dockerfile | 1 + .../servers/SwooleRedisCluster/worker.php | 6 + tests/Queue/servers/Workerman/worker.php | 6 + 11 files changed, 178 insertions(+), 85 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index bf0089d..6b230a3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,8 +3,9 @@ services: container_name: tests build: . volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: - swoole - swoole-amqp @@ -16,8 +17,9 @@ services: build: ./tests/Queue/servers/Swoole/. command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: - redis @@ -26,8 +28,9 @@ services: build: ./tests/Queue/servers/SwooleRedisCluster/. command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: redis-cluster-0: condition: service_healthy @@ -37,8 +40,9 @@ services: build: ./tests/Queue/servers/AMQP/. command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: amqp: condition: service_healthy @@ -48,8 +52,9 @@ services: build: ./tests/Queue/servers/Workerman/. command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: - redis diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index fef30da..7160052 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -2,7 +2,9 @@ namespace Utopia\Queue\Adapter; +use Swoole\Constant; use Swoole\Process\Pool; +use Utopia\CLI\Console; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; @@ -10,6 +12,9 @@ class Swoole extends Adapter { protected Pool $pool; + /** @var callable */ + private $onStop; + public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') { parent::__construct($workerNum, $queue, $namespace); @@ -21,19 +26,59 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s public function start(): self { $this->pool->set(['enable_coroutine' => true]); + + // Register signal handlers in the main process before starting pool + if (extension_loaded('pcntl')) { + pcntl_signal(SIGTERM, function () { + Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown..."); + $this->stop(); + }); + + pcntl_signal(SIGINT, function () { + Console::info("[Swoole] Received SIGINT, initiating graceful shutdown..."); + $this->stop(); + }); + + // Enable async signals + pcntl_async_signals(true); + } else { + Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully."); + } + $this->pool->start(); return $this; } public function stop(): self { + if ($this->onStop) { + call_user_func($this->onStop); + } + + Console::info("[Swoole] Shutting down process pool..."); $this->pool->shutdown(); + Console::success("[Swoole] Process pool stopped."); return $this; } public function workerStart(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { + // Register signal handlers in each worker process for graceful shutdown + if (extension_loaded('pcntl')) { + pcntl_signal(SIGTERM, function () use ($workerId) { + Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer..."); + $this->consumer->close(); + }); + + pcntl_signal(SIGINT, function () use ($workerId) { + Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer..."); + $this->consumer->close(); + }); + + pcntl_async_signals(true); + } + call_user_func($callback, $workerId); }); @@ -42,7 +87,8 @@ public function workerStart(callable $callback): self public function workerStop(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { + $this->onStop = $callback; + $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { call_user_func($callback, $workerId); }); diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 21825b1..f6c3fd7 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { + $this->channel?->stopConsume(); $this->channel?->getConnection()?->close(); } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 3ee64c4..d1e72b3 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -53,6 +53,11 @@ class Server */ protected Hook $workerStartHook; + /** + * Hook that is called when worker stops + */ + protected Hook $workerStopHook; + /** * @var array */ @@ -96,7 +101,7 @@ public function getResource(string $name, bool $fresh = false): mixed { if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { if (!\array_key_exists($name, self::$resourcesCallbacks)) { - throw new Exception('Failed to find resource: "' . $name . '"'); + throw new Exception("Failed to find resource: $name"); } $this->resources[$name] = \call_user_func_array( @@ -213,49 +218,23 @@ public function start(): self $this->adapter->workerStart(function (string $workerId) { Console::success("[Worker] Worker {$workerId} is ready!"); self::setResource('workerId', fn () => $workerId); - if (!is_null($this->workerStartHook)) { + if ($this->workerStartHook !== null) { call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } - while (true) { - $this->adapter->consumer->consume( - $this->adapter->queue, - function (Message $message) { - $receivedAtTimestamp = microtime(true); - Console::info("[Job] Received Job ({$message->getPid()})."); - try { - $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); - - $this->resources = []; - self::setResource('message', fn () => $message); - if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } - } - } - - foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } - } - } - - return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); - } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); - } - }, - function (Message $message) { + $this->adapter->consumer->consume( + $this->adapter->queue, + function (Message $message) { + $receivedAtTimestamp = microtime(true); + Console::info("[Job] Received Job ({$message->getPid()})."); + try { + $waitDuration = microtime(true) - $message->getTimestamp(); + $this->jobWaitTime->record($waitDuration); + + $this->resources = []; + self::setResource('message', fn () => $message); if ($this->job->getHook()) { - foreach ($this->shutdownHooks as $hook) { // Global init hooks + foreach ($this->initHooks as $hook) { // Global init hooks if (in_array('*', $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); @@ -264,27 +243,65 @@ function (Message $message) { } foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks + foreach ($this->initHooks as $hook) { // Group init hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } } } - Console::success("[Job] ({$message->getPid()}) successfully run."); - }, - function (?Message $message, Throwable $th) { - Console::error("[Job] ({$message?->getPid()}) failed to run."); - Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); - self::setResource('error', fn () => $th); + return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); + } finally { + $processDuration = microtime(true) - $receivedAtTimestamp; + $this->processDuration->record($processDuration); + } + }, + function (Message $message) { + if ($this->job->getHook()) { + foreach ($this->shutdownHooks as $hook) { // Global init hooks + if (in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } + } + } - foreach ($this->errorHooks as $hook) { - ($hook->getAction())(...$this->getArguments($hook)); + foreach ($this->job->getGroups() as $group) { + foreach ($this->shutdownHooks as $hook) { // Group init hooks + if (in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } } - }, - ); + } + Console::success("[Job] ({$message->getPid()}) successfully run."); + }, + function (?Message $message, Throwable $th) { + Console::error("[Job] ({$message?->getPid()}) failed to run."); + Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); + + self::setResource('error', fn () => $th); + + foreach ($this->errorHooks as $hook) { + $hook->getAction()(...$this->getArguments($hook)); + } + }, + ); + }); + + $this->adapter->workerStop(function ($workerId) { + Console::info("[Worker] Worker {$workerId} stopping..."); + self::setResource('workerId', fn () => $workerId); + + // Call user-defined workerStop hook if set + if ($this->workerStopHook !== null) { + call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook)); } + + // Close consumer connection + $this->adapter->consumer->close(); + Console::success("[Worker] Worker {$workerId} stopped gracefully."); }); $this->adapter->start(); @@ -320,27 +337,23 @@ public function getWorkerStart(): Hook /** * Is called when a Worker stops. - * @param callable|null $callback - * @return self - * @throws Exception + * @return Hook */ - public function workerStop(?callable $callback = null): self + public function workerStop(): Hook { - try { - $this->adapter->workerStop(function (string $workerId) use ($callback) { - Console::success("[Worker] Worker {$workerId} is ready!"); - if (!is_null($callback)) { - call_user_func($callback); - } - }); - } catch (Throwable $error) { - self::setResource('error', fn () => $error); - foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); - } - } + $hook = new Hook(); + $hook->groups(['*']); + $this->workerStopHook = $hook; + return $hook; + } - return $this; + /** + * Returns Worker stops hook. + * @return ?Hook + */ + public function getWorkerStop(): ?Hook + { + return $this->workerStopHook; } /** @@ -355,7 +368,7 @@ protected function getArguments(Hook $hook, array $payload = []): array $arguments = []; foreach ($hook->getParams() as $key => $param) { // Get value from route or request object $value = $payload[$key] ?? $param['default']; - $value = ($value === '' || is_null($value)) ? $param['default'] : $value; + $value = ($value === '' || $value === null) ? $param['default'] : $value; $this->validate($key, $param, $value); $hook->setParamValue($key, $value); @@ -384,7 +397,7 @@ protected function getArguments(Hook $hook, array $payload = []): array */ protected function validate(string $key, array $param, mixed $value): void { - if ('' !== $value && !is_null($value)) { + if ('' !== $value && $value !== null) { $validator = $param['validator']; // checking whether the class exists if (\is_callable($validator)) { @@ -399,7 +412,7 @@ protected function validate(string $key, array $param, mixed $value): void throw new Exception('Invalid ' .$key . ': ' . $validator->getDescription(), 400); } } elseif (!$param['optional']) { - throw new Exception('Param "' . $key . '" is not optional.', 400); + throw new Exception("Param $key is not optional.", 400); } } diff --git a/tests/Queue/servers/AMQP/Dockerfile b/tests/Queue/servers/AMQP/Dockerfile index 8643629..13895e9 100644 --- a/tests/Queue/servers/AMQP/Dockerfile +++ b/tests/Queue/servers/AMQP/Dockerfile @@ -1,3 +1,5 @@ FROM phpswoole/swoole:php8.3-alpine -RUN apk add autoconf build-base \ No newline at end of file +RUN apk add autoconf build-base + +RUN docker-php-ext-install pcntl \ No newline at end of file diff --git a/tests/Queue/servers/AMQP/worker.php b/tests/Queue/servers/AMQP/worker.php index d590d46..9b1bccc 100644 --- a/tests/Queue/servers/AMQP/worker.php +++ b/tests/Queue/servers/AMQP/worker.php @@ -29,4 +29,10 @@ echo "Worker Started" . PHP_EOL; }); +$server + ->workerStop() + ->action(function () { + echo "Worker Stopped" . PHP_EOL; + }); + $server->start(); diff --git a/tests/Queue/servers/Swoole/Dockerfile b/tests/Queue/servers/Swoole/Dockerfile index 7857498..e97d908 100644 --- a/tests/Queue/servers/Swoole/Dockerfile +++ b/tests/Queue/servers/Swoole/Dockerfile @@ -2,4 +2,5 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base +RUN docker-php-ext-install pcntl RUN docker-php-ext-enable redis \ No newline at end of file diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 3645a1d..02b9ef3 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -30,4 +30,10 @@ echo "Worker Started" . PHP_EOL; }); +$server + ->workerStop() + ->action(function () { + echo "Worker Stopped" . PHP_EOL; + }); + $server->start(); diff --git a/tests/Queue/servers/SwooleRedisCluster/Dockerfile b/tests/Queue/servers/SwooleRedisCluster/Dockerfile index 7857498..e97d908 100644 --- a/tests/Queue/servers/SwooleRedisCluster/Dockerfile +++ b/tests/Queue/servers/SwooleRedisCluster/Dockerfile @@ -2,4 +2,5 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base +RUN docker-php-ext-install pcntl RUN docker-php-ext-enable redis \ No newline at end of file diff --git a/tests/Queue/servers/SwooleRedisCluster/worker.php b/tests/Queue/servers/SwooleRedisCluster/worker.php index d120b24..779d3ac 100644 --- a/tests/Queue/servers/SwooleRedisCluster/worker.php +++ b/tests/Queue/servers/SwooleRedisCluster/worker.php @@ -30,4 +30,10 @@ echo "Worker Started" . PHP_EOL; }); +$server + ->workerStop() + ->action(function () { + echo "Worker Stopped" . PHP_EOL; + }); + $server->start(); diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index 5a093ec..dd6ea1d 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -29,4 +29,10 @@ echo "Worker Started" . PHP_EOL; }); +$server + ->workerStop() + ->action(function () { + echo "Worker Stopped" . PHP_EOL; + }); + $server->start(); From 83d6ac3adb161f81ef039a1a61a1c0f9aa82583d Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 16 Dec 2025 11:56:26 +0000 Subject: [PATCH 2/2] chore: add workerstop --- pint.json | 5 +- src/Queue/Adapter.php | 6 - src/Queue/Adapter/Swoole.php | 122 +++++---- src/Queue/Adapter/Workerman.php | 5 - src/Queue/Broker/AMQP.php | 4 +- src/Queue/Broker/Pool.php | 20 +- src/Queue/Broker/Redis.php | 14 +- src/Queue/Connection.php | 1 + src/Queue/Connection/Redis.php | 6 + src/Queue/Connection/RedisCluster.php | 6 + src/Queue/Queue.php | 2 +- src/Queue/Server.php | 245 +++++++++++++----- tests/Queue/servers/AMQP/Dockerfile | 2 - tests/Queue/servers/AMQP/worker.php | 33 +-- tests/Queue/servers/Swoole/Dockerfile | 3 +- tests/Queue/servers/Swoole/worker.php | 39 ++- .../servers/SwooleRedisCluster/Dockerfile | 3 +- .../servers/SwooleRedisCluster/worker.php | 45 ++-- tests/Queue/servers/Workerman/Dockerfile | 2 +- tests/Queue/servers/Workerman/worker.php | 32 +-- 20 files changed, 350 insertions(+), 245 deletions(-) diff --git a/pint.json b/pint.json index c781933..dc0de5d 100644 --- a/pint.json +++ b/pint.json @@ -1,3 +1,6 @@ { - "preset": "psr12" + "preset": "psr12", + "rules": { + "single_quote": true + } } \ No newline at end of file diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index 64c378d..510fe08 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -40,10 +40,4 @@ abstract public function workerStart(callable $callback): self; * @return self */ abstract public function workerStop(callable $callback): self; - - /** - * Returns the native server object from the Adapter. - * @return mixed - */ - abstract public function getNative(): mixed; } diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index 7160052..52b53bc 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -2,101 +2,99 @@ namespace Utopia\Queue\Adapter; -use Swoole\Constant; -use Swoole\Process\Pool; -use Utopia\CLI\Console; +use Swoole\Coroutine; +use Swoole\Process; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; class Swoole extends Adapter { - protected Pool $pool; + /** @var Process[] */ + protected array $workers = []; - /** @var callable */ - private $onStop; + /** @var callable[] */ + protected array $onWorkerStart = []; - public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') - { - parent::__construct($workerNum, $queue, $namespace); + /** @var callable[] */ + protected array $onWorkerStop = []; + public function __construct( + Consumer $consumer, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue', + ) { + parent::__construct($workerNum, $queue, $namespace); $this->consumer = $consumer; - $this->pool = new Pool($workerNum); } public function start(): self { - $this->pool->set(['enable_coroutine' => true]); + for ($i = 0; $i < $this->workerNum; $i++) { + $this->spawnWorker($i); + } - // Register signal handlers in the main process before starting pool - if (extension_loaded('pcntl')) { - pcntl_signal(SIGTERM, function () { - Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown..."); - $this->stop(); - }); + Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); - pcntl_signal(SIGINT, function () { - Console::info("[Swoole] Received SIGINT, initiating graceful shutdown..."); - $this->stop(); - }); + Coroutine\run(function () { + Process::signal(SIGTERM, fn () => $this->stop()); + Process::signal(SIGINT, fn () => $this->stop()); + Process::signal(SIGCHLD, fn () => $this->reap()); - // Enable async signals - pcntl_async_signals(true); - } else { - Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully."); - } + while (\count($this->workers) > 0) { + Coroutine::sleep(1); + } + }); - $this->pool->start(); return $this; } - public function stop(): self + protected function spawnWorker(int $workerId): void { - if ($this->onStop) { - call_user_func($this->onStop); - } + $process = new Process(function () use ($workerId) { + Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); - Console::info("[Swoole] Shutting down process pool..."); - $this->pool->shutdown(); - Console::success("[Swoole] Process pool stopped."); - return $this; + Coroutine\run(function () use ($workerId) { + Process::signal(SIGTERM, fn () => $this->consumer->close()); + + foreach ($this->onWorkerStart as $callback) { + $callback((string)$workerId); + } + + foreach ($this->onWorkerStop as $callback) { + $callback((string)$workerId); + } + }); + }, false, 0, false); + + $pid = $process->start(); + $this->workers[$pid] = $process; } - public function workerStart(callable $callback): self + protected function reap(): void { - $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { - // Register signal handlers in each worker process for graceful shutdown - if (extension_loaded('pcntl')) { - pcntl_signal(SIGTERM, function () use ($workerId) { - Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer..."); - $this->consumer->close(); - }); - - pcntl_signal(SIGINT, function () use ($workerId) { - Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer..."); - $this->consumer->close(); - }); - - pcntl_async_signals(true); - } - - call_user_func($callback, $workerId); - }); + while (($ret = Process::wait(false)) !== false) { + unset($this->workers[$ret['pid']]); + } + } + public function stop(): self + { + foreach ($this->workers as $pid => $process) { + Process::kill($pid, SIGTERM); + } return $this; } - public function workerStop(callable $callback): self + public function workerStart(callable $callback): self { - $this->onStop = $callback; - $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { - call_user_func($callback, $workerId); - }); - + $this->onWorkerStart[] = $callback; return $this; } - public function getNative(): Pool + public function workerStop(callable $callback): self { - return $this->pool; + $this->onWorkerStop[] = $callback; + return $this; } } diff --git a/src/Queue/Adapter/Workerman.php b/src/Queue/Adapter/Workerman.php index 8c45d6f..5e86b33 100644 --- a/src/Queue/Adapter/Workerman.php +++ b/src/Queue/Adapter/Workerman.php @@ -47,9 +47,4 @@ public function workerStop(callable $callback): self return $this; } - - public function getNative(): Worker - { - return $this->worker; - } } diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index f6c3fd7..62b2774 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -111,7 +111,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); // 2. Declare the working queue and configure the DLX for receiving rejected messages. - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"]))); $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); // 3. Declare the dead-letter-queue and bind it to the DLX. @@ -162,7 +162,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int { $queueName = $queue->name; if ($failedJobs) { - $queueName = $queueName . ".failed"; + $queueName = $queueName . '.failed'; } $client = new Client(); diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index 8fcf5f0..aa7cf92 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int return $this->delegatePublish(__FUNCTION__, \func_get_args()); } - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { + public function consume( + Queue $queue, + callable $messageCallback, + callable $successCallback, + callable $errorCallback, + ): void { $this->delegateConsumer(__FUNCTION__, \func_get_args()); } public function close(): void { - $this->delegateConsumer(__FUNCTION__, \func_get_args()); + // TODO: Implement closing all connections in the pool } protected function delegatePublish(string $method, array $args): mixed { - return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) { + return $this->publisher?->use(function (Publisher $adapter) use ( + $method, + $args, + ) { return $adapter->$method(...$args); }); } protected function delegateConsumer(string $method, array $args): mixed { - return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) { + return $this->consumer?->use(function (Consumer $adapter) use ( + $method, + $args, + ) { return $adapter->$method(...$args); }); } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 0e3beb7..2ba9273 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -10,6 +10,8 @@ class Redis implements Publisher, Consumer { + private const int POP_TIMEOUT = 2; + private bool $closed = false; public function __construct(private readonly Connection $connection) @@ -22,7 +24,15 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe /** * Waiting for next Job. */ - $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", 5); + try { + $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); + } catch (\RedisException $e) { + if ($this->closed) { + break; + } + + throw $e; + } if (!$nextMessage) { continue; @@ -115,7 +125,7 @@ public function retry(Queue $queue, ?int $limit = null): void $processed = 0; while (true) { - $pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", 5); + $pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); // No more jobs to retry if ($pid === false) { diff --git a/src/Queue/Connection.php b/src/Queue/Connection.php index 6f37505..c1310b7 100644 --- a/src/Queue/Connection.php +++ b/src/Queue/Connection.php @@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool; public function increment(string $key): int; public function decrement(string $key): int; public function ping(): bool; + public function close(): void; } diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 4536bb3..7418e43 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -169,6 +169,12 @@ public function ping(): bool } } + public function close(): void + { + $this->redis?->close(); + $this->redis = null; + } + protected function getRedis(): \Redis { if ($this->redis) { diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 50a2b46..476b735 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -166,6 +166,12 @@ public function ping(): bool } } + public function close(): void + { + $this->redis?->close(); + $this->redis = null; + } + protected function getRedis(): \RedisCluster { if ($this->redis) { diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index d6c9846..d22d971 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -9,7 +9,7 @@ public function __construct( public string $namespace = 'utopia-queue', ) { if (empty($this->name)) { - throw new \InvalidArgumentException("Cannot create queue with empty name."); + throw new \InvalidArgumentException('Cannot create queue with empty name.'); } } } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index d1e72b3..782d0be 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -30,33 +30,37 @@ class Server /** * Hooks that will run when error occur * - * @var array + * @var array */ protected array $errorHooks = []; /** * Hooks that will run before running job * - * @var array + * @var array */ protected array $initHooks = []; /** * Hooks that will run after running job * - * @var array + * @var array */ protected array $shutdownHooks = []; /** - * Hook that is called when worker starts + * Hooks that will run when worker starts + * + * @var array */ - protected Hook $workerStartHook; + protected array $workerStartHooks = []; /** - * Hook that is called when worker stops + * Hooks that will run when worker stops + * + * @var array */ - protected Hook $workerStopHook; + protected array $workerStopHooks = []; /** * @var array @@ -99,14 +103,20 @@ public function job(): Job */ public function getResource(string $name, bool $fresh = false): mixed { - if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { + if ( + !\array_key_exists($name, $this->resources) || + $fresh || + self::$resourcesCallbacks[$name]['reset'] + ) { if (!\array_key_exists($name, self::$resourcesCallbacks)) { throw new Exception("Failed to find resource: $name"); } $this->resources[$name] = \call_user_func_array( self::$resourcesCallbacks[$name]['callback'], - $this->getResources(self::$resourcesCallbacks[$name]['injections']) + $this->getResources( + self::$resourcesCallbacks[$name]['injections'], + ), ); } @@ -143,9 +153,16 @@ public function getResources(array $list): array * * @return void */ - public static function setResource(string $name, callable $callback, array $injections = []): void - { - self::$resourcesCallbacks[$name] = ['callback' => $callback, 'injections' => $injections, 'reset' => true]; + public static function setResource( + string $name, + callable $callback, + array $injections = [], + ): void { + self::$resourcesCallbacks[$name] = [ + 'callback' => $callback, + 'injections' => $injections, + 'reset' => true, + ]; } public function setTelemetry(Telemetry $telemetry): void @@ -154,7 +171,24 @@ public function setTelemetry(Telemetry $telemetry): void 'messaging.process.wait.duration', 's', null, - ['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]] + [ + 'ExplicitBucketBoundaries' => [ + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1, + 2.5, + 5, + 7.5, + 10, + ], + ], ); // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingprocessduration @@ -162,7 +196,24 @@ public function setTelemetry(Telemetry $telemetry): void 'messaging.process.duration', 's', null, - ['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]] + [ + 'ExplicitBucketBoundaries' => [ + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1, + 2.5, + 5, + 7.5, + 10, + ], + ], ); } @@ -189,7 +240,7 @@ public function stop(): self } catch (Throwable $error) { self::setResource('error', fn () => $error); foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($hook)); } } return $this; @@ -216,70 +267,103 @@ public function start(): self { try { $this->adapter->workerStart(function (string $workerId) { - Console::success("[Worker] Worker {$workerId} is ready!"); + Console::success("[Worker] Worker {$workerId} started."); self::setResource('workerId', fn () => $workerId); - if ($this->workerStartHook !== null) { - call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); + + foreach ($this->workerStartHooks as $hook) { + $hook->getAction()(...$this->getArguments($hook)); } $this->adapter->consumer->consume( $this->adapter->queue, function (Message $message) { $receivedAtTimestamp = microtime(true); - Console::info("[Job] Received Job ({$message->getPid()})."); + Console::info( + "[Job] Received Job ({$message->getPid()}).", + ); try { - $waitDuration = microtime(true) - $message->getTimestamp(); + $waitDuration = + microtime(true) - $message->getTimestamp(); $this->jobWaitTime->record($waitDuration); $this->resources = []; self::setResource('message', fn () => $message); if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); + foreach ($this->initHooks as $hook) { + // Global init hooks + if (\in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } } foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); + foreach ($this->initHooks as $hook) { + // Group init hooks + if (\in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } } - return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); + return \call_user_func_array( + $this->job->getAction(), + $this->getArguments( + $this->job, + $message->getPayload(), + ), + ); } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; + $processDuration = + microtime(true) - $receivedAtTimestamp; $this->processDuration->record($processDuration); } }, function (Message $message) { if ($this->job->getHook()) { - foreach ($this->shutdownHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); + foreach ($this->shutdownHooks as $hook) { + // Global init hooks + if (\in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } } foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); + foreach ($this->shutdownHooks as $hook) { + // Group init hooks + if (\in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } } - Console::success("[Job] ({$message->getPid()}) successfully run."); + Console::success( + "[Job] ({$message->getPid()}) successfully run.", + ); }, function (?Message $message, Throwable $th) { - Console::error("[Job] ({$message?->getPid()}) failed to run."); - Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); + Console::error( + "[Job] ({$message?->getPid()}) failed to run.", + ); + Console::error( + "[Job] ({$message?->getPid()}) {$th->getMessage()}", + ); self::setResource('error', fn () => $th); @@ -290,25 +374,34 @@ function (?Message $message, Throwable $th) { ); }); - $this->adapter->workerStop(function ($workerId) { - Console::info("[Worker] Worker {$workerId} stopping..."); + $this->adapter->workerStop(function (string $workerId) { self::setResource('workerId', fn () => $workerId); - // Call user-defined workerStop hook if set - if ($this->workerStopHook !== null) { - call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook)); + try { + // Call user-defined workerStop hooks + foreach ($this->workerStopHooks as $hook) { + try { + $hook->getAction()(...$this->getArguments($hook)); + } catch (Throwable $e) { + Console::error( + "[Worker] Worker {$workerId} workerStop hook failed: {$e->getMessage()}", + ); + } + } + } finally { + // Always close consumer connection, even if hooks throw + $this->adapter->consumer->close(); + Console::success( + "[Worker] Worker {$workerId} stopped.", + ); } - - // Close consumer connection - $this->adapter->consumer->close(); - Console::success("[Worker] Worker {$workerId} stopped gracefully."); }); $this->adapter->start(); } catch (Throwable $error) { self::setResource('error', fn () => $error); foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($hook)); } } return $this; @@ -322,17 +415,17 @@ public function workerStart(): Hook { $hook = new Hook(); $hook->groups(['*']); - $this->workerStartHook = $hook; + $this->workerStartHooks[] = $hook; return $hook; } /** - * Returns Worker starts hook. - * @return Hook - */ - public function getWorkerStart(): Hook + * Returns Worker starts hooks. + * @return array + */ + public function getWorkerStart(): array { - return $this->workerStartHook; + return $this->workerStartHooks; } /** @@ -343,17 +436,17 @@ public function workerStop(): Hook { $hook = new Hook(); $hook->groups(['*']); - $this->workerStopHook = $hook; + $this->workerStopHooks[] = $hook; return $hook; } /** - * Returns Worker stops hook. - * @return ?Hook + * Returns Worker stops hooks. + * @return array */ - public function getWorkerStop(): ?Hook + public function getWorkerStop(): array { - return $this->workerStopHook; + return $this->workerStopHooks; } /** @@ -366,9 +459,11 @@ public function getWorkerStop(): ?Hook protected function getArguments(Hook $hook, array $payload = []): array { $arguments = []; - foreach ($hook->getParams() as $key => $param) { // Get value from route or request object + foreach ($hook->getParams() as $key => $param) { + // Get value from route or request object $value = $payload[$key] ?? $param['default']; - $value = ($value === '' || $value === null) ? $param['default'] : $value; + $value = + $value === '' || $value === null ? $param['default'] : $value; $this->validate($key, $param, $value); $hook->setParamValue($key, $value); @@ -376,7 +471,9 @@ protected function getArguments(Hook $hook, array $payload = []): array } foreach ($hook->getInjections() as $key => $injection) { - $arguments[$injection['order']] = $this->getResource($injection['name']); + $arguments[$injection['order']] = $this->getResource( + $injection['name'], + ); } return $arguments; @@ -401,15 +498,25 @@ protected function validate(string $key, array $param, mixed $value): void $validator = $param['validator']; // checking whether the class exists if (\is_callable($validator)) { - $validator = \call_user_func_array($validator, $this->getResources($param['injections'])); + $validator = \call_user_func_array( + $validator, + $this->getResources($param['injections']), + ); } - if (!$validator instanceof Validator) { // is the validator object an instance of the Validator class - throw new Exception('Validator object is not an instance of the Validator class', 500); + if (!$validator instanceof Validator) { + // is the validator object an instance of the Validator class + throw new Exception( + 'Validator object is not an instance of the Validator class', + 500, + ); } if (!$validator->isValid($value)) { - throw new Exception('Invalid ' .$key . ': ' . $validator->getDescription(), 400); + throw new Exception( + 'Invalid ' . $key . ': ' . $validator->getDescription(), + 400, + ); } } elseif (!$param['optional']) { throw new Exception("Param $key is not optional.", 400); diff --git a/tests/Queue/servers/AMQP/Dockerfile b/tests/Queue/servers/AMQP/Dockerfile index 13895e9..65460fa 100644 --- a/tests/Queue/servers/AMQP/Dockerfile +++ b/tests/Queue/servers/AMQP/Dockerfile @@ -1,5 +1,3 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base - -RUN docker-php-ext-install pcntl \ No newline at end of file diff --git a/tests/Queue/servers/AMQP/worker.php b/tests/Queue/servers/AMQP/worker.php index 9b1bccc..9f4e286 100644 --- a/tests/Queue/servers/AMQP/worker.php +++ b/tests/Queue/servers/AMQP/worker.php @@ -3,18 +3,15 @@ require_once __DIR__ . '/../../../../vendor/autoload.php'; require_once __DIR__ . '/../tests.php'; -use Utopia\Queue; -use Utopia\Queue\Message; +use Utopia\Queue\Broker\AMQP; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Server; -$consumer = new Queue\Broker\AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); -$adapter = new Queue\Adapter\Swoole($consumer, 12, 'amqp'); -$server = new Queue\Server($adapter); +$consumer = new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); +$adapter = new Swoole($consumer, 12, 'amqp'); +$server = new Server($adapter); -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -23,16 +20,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); -$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start(); diff --git a/tests/Queue/servers/Swoole/Dockerfile b/tests/Queue/servers/Swoole/Dockerfile index e97d908..eb30cec 100644 --- a/tests/Queue/servers/Swoole/Dockerfile +++ b/tests/Queue/servers/Swoole/Dockerfile @@ -2,5 +2,4 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base -RUN docker-php-ext-install pcntl -RUN docker-php-ext-enable redis \ No newline at end of file +RUN docker-php-ext-enable redis diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 02b9ef3..833ecf2 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -3,19 +3,16 @@ require_once __DIR__ . '/../../../../vendor/autoload.php'; require_once __DIR__ . '/../tests.php'; -use Utopia\Queue; -use Utopia\Queue\Message; - -$connection = new Queue\Connection\Redis('redis'); -$consumer = new Queue\Broker\Redis($connection); -$adapter = new Queue\Adapter\Swoole($consumer, 12, 'swoole'); -$server = new Queue\Server($adapter); - -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); +use Utopia\Queue\Server; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Connection\Redis as RedisConnection; +use Utopia\Queue\Broker\Redis; + +$consumer = new Redis(new RedisConnection('redis')); +$adapter = new Swoole($consumer, 12, 'swoole'); +$server = new Server($adapter); + +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -24,16 +21,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); -$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start(); diff --git a/tests/Queue/servers/SwooleRedisCluster/Dockerfile b/tests/Queue/servers/SwooleRedisCluster/Dockerfile index e97d908..eb30cec 100644 --- a/tests/Queue/servers/SwooleRedisCluster/Dockerfile +++ b/tests/Queue/servers/SwooleRedisCluster/Dockerfile @@ -2,5 +2,4 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base -RUN docker-php-ext-install pcntl -RUN docker-php-ext-enable redis \ No newline at end of file +RUN docker-php-ext-enable redis diff --git a/tests/Queue/servers/SwooleRedisCluster/worker.php b/tests/Queue/servers/SwooleRedisCluster/worker.php index 779d3ac..e12b8ae 100644 --- a/tests/Queue/servers/SwooleRedisCluster/worker.php +++ b/tests/Queue/servers/SwooleRedisCluster/worker.php @@ -3,19 +3,22 @@ require_once __DIR__ . '/../../../../vendor/autoload.php'; require_once __DIR__ . '/../tests.php'; -use Utopia\Queue; -use Utopia\Queue\Message; - -$connection = new Queue\Connection\RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); -$consumer = new Queue\Broker\Redis($connection); -$adapter = new Queue\Adapter\Swoole($consumer, 12, 'swoole-redis-cluster'); -$server = new Queue\Server($adapter); - -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); +use Utopia\Queue\Broker\Redis; +use Utopia\Queue\Connection\RedisCluster; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Server; + +$consumer = new Redis( + new RedisCluster([ + 'redis-cluster-0:6379', + 'redis-cluster-1:6379', + 'redis-cluster-2:6379', + ]), +); +$adapter = new Swoole($consumer, 12, 'swoole-redis-cluster'); +$server = new Server($adapter); + +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -24,16 +27,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); -$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start(); diff --git a/tests/Queue/servers/Workerman/Dockerfile b/tests/Queue/servers/Workerman/Dockerfile index 6dd16ab..1704dd1 100644 --- a/tests/Queue/servers/Workerman/Dockerfile +++ b/tests/Queue/servers/Workerman/Dockerfile @@ -8,4 +8,4 @@ ADD https://github.com/mlocati/docker-php-extension-installer/releases/latest/do RUN docker-php-ext-configure pcntl --enable-pcntl -RUN docker-php-ext-install pcntl \ No newline at end of file +RUN docker-php-ext-install pcntl diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index dd6ea1d..f3558fb 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -4,17 +4,15 @@ require_once __DIR__ . '/../tests.php'; use Utopia\Queue; -use Utopia\Queue\Message; +use Utopia\Queue\Adapter\Workerman; +use Utopia\Queue\Connection\Redis as RedisConnection; +use Utopia\Queue\Broker\Redis; -$connection = new Queue\Connection\Redis('redis'); -$consumer = new Queue\Broker\Redis($connection); -$adapter = new Queue\Adapter\Workerman($consumer, 12, 'wokerman'); +$consumer = new Redis(new RedisConnection('redis')); +$adapter = new Workerman($consumer, 12, 'wokerman'); $server = new Queue\Server($adapter); -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); + +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -23,16 +21,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); -$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start();