Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions config/autoload/messenger.local.php.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;

return [
'symfony' => [
'messenger' => [
'transports' => [
"symfony" => [
"messenger" => [
'transports' => [
'redis_transport' => [
'dsn' => 'redis://127.0.0.1:6379/messages',
'options' => [], // Redis specific options
'dsn' => 'redis://127.0.0.1:6379/messages',
'serializer' => SymfonySerializer::class,
'retry_strategy' => [
'max_retries' => 3, //maximum number of times a message will be retried after the first failure
'delay' => 1000, // initial delay before retrying a failed message, in milliseconds
'multiplier' => 2, // factor to increase the delay for each subsequent retry
'max_delay' => 0, // maximum delay between retries, in milliseconds
],
],
// separate transport for failed messages
'failed' => [
'dsn' => 'redis://127.0.0.1:6379/failed',
'serializer' => SymfonySerializer::class,
],
],
// tells Messenger that the transport to store failed messages is "failed"
'failure_transport' => 'failed',
],
],
'dependencies' => [
'factories' => [
'redis_transport' => [TransportFactory::class, 'redis_transport'],
"dependencies" => [
"factories" => [
"redis_transport" => [TransportFactory::class, 'redis_transport'],
"failed" => [TransportFactory::class, 'failed'],
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
],
],
Expand Down
5 changes: 5 additions & 0 deletions src/App/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public function getPayload(): array
{
return $this->payload;
}

public function setPayload(array $payload): void
{
$this->payload = $payload;
}
}
61 changes: 21 additions & 40 deletions src/App/Message/MessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@

use Dot\DependencyInjection\Attribute\Inject;
use Dot\Log\Logger;
use Exception;
use Symfony\Component\Messenger\Exception\ExceptionInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Throwable;

class MessageHandler
Expand All @@ -27,53 +24,37 @@ public function __construct(
}

/**
* @throws ExceptionInterface
* @throws Throwable
*/
public function __invoke(Message $message): void
{
$payload = $message->getPayload();

try {
// Throwing an exception to satisfy PHPStan (replace with own code)
// For proof of concept and testing purposes message "control" will automatically mark it as successfully
// processed and logged as info
if ($payload['foo'] === 'control') {
$this->logger->info($payload['foo'] . ': was processed successfully');
} else {
throw new Exception('Failed to execute');
//user control message to log successfully processed message
$this->logger->info($payload['foo'] . ' processed successfully');
} elseif ($payload['foo'] === 'retry') {
//user retry message to test retry functionality
throw new \RuntimeException("Intentional failure for testing retries");
}
} catch (Throwable $exception) {
$this->logger->error($payload['foo'] . ' failed with message: '
. $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries');
$this->retry($payload);
}
}
} catch (\Throwable $e) {
$retryCount = $payload['retry_count'] ?? 0;

/**
* @throws ExceptionInterface
*/
public function retry(array $payload): void
{
if (! isset($payload['retry'])) {
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => 1]), [
new DelayStamp($this->config['fail-safe']['first_retry']),
]);
} else {
$retry = $payload['retry'];
switch ($retry) {
case 1:
$delay = $this->config['fail-safe']['second_retry'];
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => ++$retry]), [
new DelayStamp($delay),
]);
break;
case 2:
$delay = $this->config['fail-safe']['third_retry'];
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => ++$retry]), [
new DelayStamp($delay),
]);
break;
if ($retryCount === 0) {
$this->logger->error(
"Message '{$payload['foo']}' failed because: " . $e->getMessage()
);
} else {
$this->logger->error(
"Message '{$payload['foo']}' failed because: " . $e->getMessage() . " Retry {$retryCount}"
);
}

$payload['retry_count'] = $retryCount + 1;
$message->setPayload($payload);

throw $e;
}
}
}
136 changes: 44 additions & 92 deletions test/App/Message/MessageHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@
use Psr\Container\ContainerExceptionInterface;
use Queue\App\Message\Message;
use Queue\App\Message\MessageHandler;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\ExceptionInterface;
use RuntimeException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

use const PHP_EOL;

class MessageHandlerTest extends TestCase
{
private MessageBusInterface|MockObject $bus;
private Logger $logger;
private array $config;
private MessageHandler $handler;

/**
Expand All @@ -29,17 +27,16 @@ class MessageHandlerTest extends TestCase
*/
protected function setUp(): void
{
$this->bus = $this->createMock(MessageBusInterface::class);

$logger = new Logger([
$this->bus = $this->createMock(MessageBusInterface::class);
$this->logger = new Logger([
'writers' => [
'FileWriter' => [
'name' => 'null',
'level' => Logger::ALERT,
],
],
]);
$config = [
$this->config = [
'fail-safe' => [
'first_retry' => 1000,
'second_retry' => 2000,
Expand All @@ -50,107 +47,62 @@ protected function setUp(): void
'protocol' => 'tcp',
'host' => 'localhost',
'port' => '8556',
'eof' => PHP_EOL,
'eof' => "\n",
],
],
'application' => [
'name' => 'dotkernel',
],
];

$this->handler = new MessageHandler($this->bus, $logger, $config);
$this->handler = new MessageHandler($this->bus, $this->logger, $this->config);
}

/**
* @throws Exception
* @throws ExceptionInterface
*/
public function testInvokeSuccessfulProcessing(): void
public function testControlMessageDoesNotThrowAndDoesNotSetRetryCount(): void
{
$payload = ['foo' => 'control'];
$message = $this->createMock(Message::class);
$message->method('getPayload')->willReturn($payload);
$handler = $this->handler;

$this->handler->__invoke($message);
$message = new Message(['foo' => 'control']);
$handler($message);

$this->expectNotToPerformAssertions();
$payload = $message->getPayload();
$this->assertArrayNotHasKey('retry_count', $payload);
}

/**
* @throws Exception
* @throws ExceptionInterface
*/
public function testInvokeFailureTriggersFirstRetry(): void
public function testRetryMessageThrowsExceptionAndSetsRetryCount(): void
{
$payload = ['foo' => 'fail'];
$message = $this->createMock(Message::class);
$message->method('getPayload')->willReturn($payload);

$this->bus->expects($this->once())
->method('dispatch')
->with(
$this->callback(function ($msg) {
return $msg instanceof Message
&& $msg->getPayload()['foo'] === 'fail'
&& $msg->getPayload()['retry'] === 1;
}),
$this->callback(function ($stamps) {
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
&& $stamps[0]->getDelay() === 1000;
})
)
->willReturn(new Envelope($message));

$this->handler->__invoke($message);
}
$handler = $this->handler;

/**
* @throws ExceptionInterface
*/
public function testRetrySecondTime(): void
{
$payload = ['foo' => 'retry_test', 'retry' => 1];

$this->bus->expects($this->once())
->method('dispatch')
->with(
$this->callback(function ($msg) {
return $msg instanceof Message
&& $msg->getPayload()['retry'] === 2
&& $msg->getPayload()['foo'] === 'retry_test';
}),
$this->callback(function ($stamps) {
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
&& $stamps[0]->getDelay() === 2000;
})
)
->willReturn(new Envelope(new Message($payload)));

$this->handler->retry($payload);
$message = new Message(['foo' => 'retry']);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage("Intentional failure for testing retries");

try {
$handler($message);
} finally {
$payload = $message->getPayload();
$this->assertArrayHasKey('retry_count', $payload);
$this->assertEquals(1, $payload['retry_count']); // first retry
}
}

/**
* @throws ExceptionInterface
*/
public function testRetryThirdTime(): void
public function testRetryMessageWithExistingRetryCountIncrementsIt(): void
{
$payload = ['foo' => 'retry_test', 'retry' => 2];

$this->bus->expects($this->once())
->method('dispatch')
->with(
$this->callback(function ($msg) {
return $msg instanceof Message
&& $msg->getPayload()['retry'] === 3
&& $msg->getPayload()['foo'] === 'retry_test';
}),
$this->callback(function ($stamps) {
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
&& $stamps[0]->getDelay() === 3000;
})
)
->willReturn(new Envelope(new Message($payload)));

$this->handler->retry($payload);
$handler = $this->handler;

$message = new Message([
'foo' => 'retry',
'retry_count' => 2,
]);

$this->expectException(RuntimeException::class);

try {
$handler($message);
} finally {
$payload = $message->getPayload();
$this->assertEquals(3, $payload['retry_count']); // incremented from 2 → 3
}
}
}
Loading