From 43daa35e1cdb00fb69e91d510c9f5f708b65d0e0 Mon Sep 17 00:00:00 2001 From: bota Date: Thu, 4 Sep 2025 12:30:55 +0300 Subject: [PATCH 1/3] implement dlq to queue Signed-off-by: bota --- config/autoload/messenger.local.php.dist | 42 +++++--- src/App/Message/Message.php | 5 + src/App/Message/MessageHandler.php | 62 +++++------- test/App/Message/MessageHandlerTest.php | 119 +++++++---------------- 4 files changed, 95 insertions(+), 133 deletions(-) diff --git a/config/autoload/messenger.local.php.dist b/config/autoload/messenger.local.php.dist index 4268d3b..11b8ddb 100644 --- a/config/autoload/messenger.local.php.dist +++ b/config/autoload/messenger.local.php.dist @@ -1,25 +1,41 @@ [ + "symfony" => [ "messenger" => [ - "transports" => [ - "redis_transport" => [ - 'dsn' => 'redis://127.0.0.1:6379/messages', - 'options' => [], // Redis specific options + 'transports' => [ + 'redis_transport' => [ + 'dsn' => 'redis://127.0.0.1:6379/messages', + 'serializer' => SymfonySerializer::class, + 'retry_strategy' => [ + 'max_retries' => 3, //maximum number of times a message will be retried after the first failure + 'delay' => 1000 // initial delay before retrying a failed message, in milliseconds, + 'multiplier' => 2 // factor to increase the delay for each subsequent retry, + 'max_delay' => 0 // maximum delay between retries, in milliseconds, + ], + ], + // separate transport for failed messages + 'failed' => [ + 'dsn' => 'redis://127.0.0.1:6379/failed', 'serializer' => SymfonySerializer::class, - ] - ] - ] + ], + ], + // tells Messenger that the transport to store failed messages is "failed" + 'failure_transport' => 'failed', + ], ], "dependencies" => [ "factories" => [ - "redis_transport" => [TransportFactory::class, 'redis_transport'], - SymfonySerializer::class => fn(\Psr\Container\ContainerInterface $container) => new PhpSerializer() - ] - ] -]; \ No newline at end of file + "redis_transport" => [TransportFactory::class, 'redis_transport'], + "failed" => [TransportFactory::class, 'failed'], + SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(), + ], + ], +]; diff --git a/src/App/Message/Message.php b/src/App/Message/Message.php index aeee893..2ac080b 100644 --- a/src/App/Message/Message.php +++ b/src/App/Message/Message.php @@ -15,4 +15,9 @@ public function getPayload(): array { return $this->payload; } + + public function setPayload(array $payload): void + { + $this->payload = $payload; + } } diff --git a/src/App/Message/MessageHandler.php b/src/App/Message/MessageHandler.php index 74d008b..39a0d2b 100644 --- a/src/App/Message/MessageHandler.php +++ b/src/App/Message/MessageHandler.php @@ -6,9 +6,8 @@ use Dot\DependencyInjection\Attribute\Inject; use Dot\Log\Logger; -use Symfony\Component\Messenger\Exception\ExceptionInterface; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Stamp\DelayStamp; +use Throwable; class MessageHandler { @@ -24,51 +23,38 @@ public function __construct( ) { } + /** + * @throws Throwable + */ public function __invoke(Message $message): void { $payload = $message->getPayload(); try { - // Throwing an exception to satisfy PHPStan (replace with own code) - // For proof of concept and testing purposes message "control" will automatically mark it as successfully - // processed and logged as info if ($payload['foo'] === 'control') { - $this->logger->info($payload['foo'] . ': was processed successfully'); - } else { - throw new \Exception("Failed to execute"); + //user control message to log successfully processed message + $this->logger->info($payload['foo'] . ' processed successfully'); + } elseif ($payload['foo'] === 'retry') { + //user retry message to test retry functionality + throw new \RuntimeException("Intentional failure for testing retries"); } - } catch (\Throwable $exception) { - $this->logger->error($payload['foo'] . ' failed with message: ' - . $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries'); - $this->retry($payload); - } - } + } catch (\Throwable $e) { + $retryCount = $payload['retry_count'] ?? 0; - /** - * @throws ExceptionInterface - */ - public function retry(array $payload): void - { - if (! isset($payload['retry'])) { - $this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => 1]), [ - new DelayStamp($this->config['fail-safe']['first_retry']), - ]); - } else { - $retry = $payload['retry']; - switch ($retry) { - case 1: - $delay = $this->config['fail-safe']['second_retry']; - $this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => ++$retry]), [ - new DelayStamp($delay), - ]); - break; - case 2: - $delay = $this->config['fail-safe']['third_retry']; - $this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => ++$retry]), [ - new DelayStamp($delay), - ]); - break; + if ($retryCount === 0) { + $this->logger->error( + "Message '{$payload['foo']}' failed because: " . $e->getMessage() + ); + } else { + $this->logger->error( + "Message '{$payload['foo']}' failed because: " . $e->getMessage() . " Retry {$retryCount}" + ); } + + $payload['retry_count'] = $retryCount + 1; + $message->setPayload($payload); + + throw $e; } } } diff --git a/test/App/Message/MessageHandlerTest.php b/test/App/Message/MessageHandlerTest.php index 9e50151..0f908d4 100644 --- a/test/App/Message/MessageHandlerTest.php +++ b/test/App/Message/MessageHandlerTest.php @@ -11,10 +11,8 @@ use Psr\Container\ContainerExceptionInterface; use Queue\App\Message\Message; use Queue\App\Message\MessageHandler; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Exception\ExceptionInterface; +use RuntimeException; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Stamp\DelayStamp; class MessageHandlerTest extends TestCase { @@ -60,94 +58,51 @@ protected function setUp(): void $this->handler = new MessageHandler($this->bus, $this->logger, $this->config); } - /** - * @throws Exception - */ - public function testInvokeSuccessfulProcessing(): void + public function testControlMessageDoesNotThrowAndDoesNotSetRetryCount(): void { - $payload = ['foo' => 'control']; - $message = $this->createMock(Message::class); - $message->method('getPayload')->willReturn($payload); + $handler = $this->handler; - $this->handler->__invoke($message); + $message = new Message(['foo' => 'control']); + $handler($message); - $this->expectNotToPerformAssertions(); + $payload = $message->getPayload(); + $this->assertArrayNotHasKey('retry_count', $payload); } - /** - * @throws Exception - */ - public function testInvokeFailureTriggersFirstRetry(): void + public function testRetryMessageThrowsExceptionAndSetsRetryCount(): void { - $payload = ['foo' => 'fail']; - $message = $this->createMock(Message::class); - $message->method('getPayload')->willReturn($payload); - - $this->bus->expects($this->once()) - ->method('dispatch') - ->with( - $this->callback(function ($msg) { - return $msg instanceof Message - && $msg->getPayload()['foo'] === 'fail' - && $msg->getPayload()['retry'] === 1; - }), - $this->callback(function ($stamps) { - return isset($stamps[0]) && $stamps[0] instanceof DelayStamp - && $stamps[0]->getDelay() === 1000; - }) - ) - ->willReturn(new Envelope($message)); - - $this->handler->__invoke($message); - } + $handler = $this->handler; - /** - * @throws ExceptionInterface - */ - public function testRetrySecondTime(): void - { - $payload = ['foo' => 'retry_test', 'retry' => 1]; - - $this->bus->expects($this->once()) - ->method('dispatch') - ->with( - $this->callback(function ($msg) { - return $msg instanceof Message - && $msg->getPayload()['retry'] === 2 - && $msg->getPayload()['foo'] === 'retry_test'; - }), - $this->callback(function ($stamps) { - return isset($stamps[0]) && $stamps[0] instanceof DelayStamp - && $stamps[0]->getDelay() === 2000; - }) - ) - ->willReturn(new Envelope(new Message($payload))); - - $this->handler->retry($payload); + $message = new Message(['foo' => 'retry']); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage("Intentional failure for testing retries"); + + try { + $handler($message); + } finally { + $payload = $message->getPayload(); + $this->assertArrayHasKey('retry_count', $payload); + $this->assertEquals(1, $payload['retry_count']); // first retry + } } - /** - * @throws ExceptionInterface - */ - public function testRetryThirdTime(): void + public function testRetryMessageWithExistingRetryCountIncrementsIt(): void { - $payload = ['foo' => 'retry_test', 'retry' => 2]; - - $this->bus->expects($this->once()) - ->method('dispatch') - ->with( - $this->callback(function ($msg) { - return $msg instanceof Message - && $msg->getPayload()['retry'] === 3 - && $msg->getPayload()['foo'] === 'retry_test'; - }), - $this->callback(function ($stamps) { - return isset($stamps[0]) && $stamps[0] instanceof DelayStamp - && $stamps[0]->getDelay() === 3000; - }) - ) - ->willReturn(new Envelope(new Message($payload))); - - $this->handler->retry($payload); + $handler = $this->handler; + + $message = new Message([ + 'foo' => 'retry', + 'retry_count' => 2, + ]); + + $this->expectException(RuntimeException::class); + + try { + $handler($message); + } finally { + $payload = $message->getPayload(); + $this->assertEquals(3, $payload['retry_count']); // incremented from 2 → 3 + } } } From a705c4cc3316178d0bd7f90935af0bceda0fca1a Mon Sep 17 00:00:00 2001 From: bota Date: Thu, 4 Sep 2025 12:33:32 +0300 Subject: [PATCH 2/3] phpcs Signed-off-by: bota --- test/App/Message/MessageHandlerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/App/Message/MessageHandlerTest.php b/test/App/Message/MessageHandlerTest.php index 0f908d4..4c2277a 100644 --- a/test/App/Message/MessageHandlerTest.php +++ b/test/App/Message/MessageHandlerTest.php @@ -92,7 +92,7 @@ public function testRetryMessageWithExistingRetryCountIncrementsIt(): void $handler = $this->handler; $message = new Message([ - 'foo' => 'retry', + 'foo' => 'retry', 'retry_count' => 2, ]); From 82d2b60d8e5c566b81c453a20ae00b58ae567281 Mon Sep 17 00:00:00 2001 From: bota Date: Thu, 4 Sep 2025 12:38:17 +0300 Subject: [PATCH 3/3] typo Signed-off-by: bota --- config/autoload/messenger.local.php.dist | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config/autoload/messenger.local.php.dist b/config/autoload/messenger.local.php.dist index 11b8ddb..78a2c4a 100644 --- a/config/autoload/messenger.local.php.dist +++ b/config/autoload/messenger.local.php.dist @@ -16,9 +16,9 @@ return [ 'serializer' => SymfonySerializer::class, 'retry_strategy' => [ 'max_retries' => 3, //maximum number of times a message will be retried after the first failure - 'delay' => 1000 // initial delay before retrying a failed message, in milliseconds, - 'multiplier' => 2 // factor to increase the delay for each subsequent retry, - 'max_delay' => 0 // maximum delay between retries, in milliseconds, + 'delay' => 1000, // initial delay before retrying a failed message, in milliseconds + 'multiplier' => 2, // factor to increase the delay for each subsequent retry + 'max_delay' => 0, // maximum delay between retries, in milliseconds ], ], // separate transport for failed messages