From 1cd5ff7e95700696d81c0ac953d3784d7f774f11 Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Thu, 13 May 2021 12:16:00 +0300 Subject: [PATCH 1/8] HUB-406 RouterPublisher --- Enum/QueueTypeEnum.php | 1 + Producer/RabbitMqProducer.php | 4 +- Producer/RabbitMqProducerInterface.php | 2 +- Publisher/AbstractPublisher.php | 26 ++++----- Publisher/PublisherInterface.php | 2 +- Publisher/RouterPublisher.php | 37 +++++++++++++ README.md | 5 +- Registry/PublisherRegistry.php | 4 +- .../DeduplicateDelayPublisherTest.php | 18 +++++++ Tests/Publisher/DeduplicatePublisherTest.php | 24 ++++----- Tests/Publisher/DelayPublisherTest.php | 18 +++++++ Tests/Publisher/FifoPublisherTest.php | 18 +++++++ Tests/Publisher/RouterPublisherTest.php | 54 +++++++++++++++++++ Tests/TestCase/AbstractTestCase.php | 1 + 14 files changed, 175 insertions(+), 39 deletions(-) create mode 100644 Publisher/RouterPublisher.php create mode 100644 Tests/Publisher/RouterPublisherTest.php diff --git a/Enum/QueueTypeEnum.php b/Enum/QueueTypeEnum.php index 4f19255..ee04396 100644 --- a/Enum/QueueTypeEnum.php +++ b/Enum/QueueTypeEnum.php @@ -10,4 +10,5 @@ class QueueTypeEnum public const DELAY = 2; public const REPLACE = 4; public const DEDUPLICATE = 8; + public const ROUTER = 16; } diff --git a/Producer/RabbitMqProducer.php b/Producer/RabbitMqProducer.php index 579b7ac..d39088e 100644 --- a/Producer/RabbitMqProducer.php +++ b/Producer/RabbitMqProducer.php @@ -35,7 +35,7 @@ public function __construct( * @throws DefinitionNotFoundException * @throws HydratorNotFoundException */ - public function put(string $queueName, $data, array $options = []): void + public function put(string $queueName, $data, array $options = [], string $routingKey = ''): void { $dataString = $this->hydratorRegistry->getHydrator($this->hydratorName)->dehydrate($data); @@ -44,6 +44,6 @@ public function put(string $queueName, $data, array $options = []): void $publisher = $this->publisherRegistry->getPublisher($queueType); - $publisher->publish($definition, $dataString, $options); + $publisher->publish($definition, $dataString, $options, $routingKey); } } diff --git a/Producer/RabbitMqProducerInterface.php b/Producer/RabbitMqProducerInterface.php index 92d57b6..267dd44 100644 --- a/Producer/RabbitMqProducerInterface.php +++ b/Producer/RabbitMqProducerInterface.php @@ -6,5 +6,5 @@ interface RabbitMqProducerInterface { - public function put(string $queueName, $data, array $options = []); + public function put(string $queueName, $data, array $options = [], string $routingKey = ''); } diff --git a/Publisher/AbstractPublisher.php b/Publisher/AbstractPublisher.php index 3d2da4f..e805a57 100644 --- a/Publisher/AbstractPublisher.php +++ b/Publisher/AbstractPublisher.php @@ -29,10 +29,10 @@ public function __construct(RabbitMqClient $client, HydratorRegistry $hydratorRe abstract protected function prepareOptions(DefinitionInterface $definition, array $options): array; - public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void + public function publish(DefinitionInterface $definition, string $dataString, array $options = [], string $routingKey = ''): void { $exchangeName = $this->getDefinitionExchangeName($definition); - $queueName = $this->getDefinitionQueueName($definition); + $route = $routingKey !== '' ? $routingKey : $this->getDefinitionQueueName($definition); $message = new AMQPMessage($dataString, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, @@ -45,32 +45,24 @@ public function publish(DefinitionInterface $definition, string $dataString, arr $message->set('application_headers', new AMQPTable($amqpTableOptions)); } - $this->client->publish($message, $exchangeName, $queueName); + $this->client->publish($message, $exchangeName, $route); } abstract public static function getQueueType(): string; protected function getDefinitionExchangeName(DefinitionInterface $definition): string { - if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) { - return self::DEFAULT_NAME; - } - - return $definition->getQueueType() === QueueTypeEnum::FIFO - ? self::DEFAULT_NAME - : $definition->getEntryPointName() + return $definition->getQueueType() & (QueueTypeEnum::ROUTER | QueueTypeEnum::DELAY) + ? $definition->getEntryPointName() + : self::DEFAULT_NAME ; } protected function getDefinitionQueueName(DefinitionInterface $definition): string { - if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) { - return $definition::getQueueName(); - } - - return $definition->getQueueType() === QueueTypeEnum::FIFO - ? $definition::getQueueName() - : self::DEFAULT_NAME + return $definition->getQueueType() & QueueTypeEnum::DELAY + ? self::DEFAULT_NAME + : $definition::getQueueName() ; } } diff --git a/Publisher/PublisherInterface.php b/Publisher/PublisherInterface.php index bf6cb6a..bc46a32 100644 --- a/Publisher/PublisherInterface.php +++ b/Publisher/PublisherInterface.php @@ -8,7 +8,7 @@ interface PublisherInterface { public const TAG = 'wakeapp_rabbit_queue.publisher'; - public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void; + public function publish(DefinitionInterface $definition, string $dataString, array $options = [], string $routingKey = ''): void; public static function getQueueType(): string; } diff --git a/Publisher/RouterPublisher.php b/Publisher/RouterPublisher.php new file mode 100644 index 0000000..da5db60 --- /dev/null +++ b/Publisher/RouterPublisher.php @@ -0,0 +1,37 @@ +put('queue_name', $data, $options); Соответственно на каждый новый тип очереди требуется свой класс `Publisher` с кастомной логикой обработки/валидации и публикации сообщений в канал. -Бандл поддерживает следующие типы очередей: +Бандл поддерживает следующие типы очередей и обменников: - FIFO - Delay - Deduplicate - Deduplicate + Delay + - Router + +Router используется для создания разветвленной топологии как описано [тут](https://www.rabbitmq.com/tutorials/tutorial-four-php.html) и [тут](https://www.rabbitmq.com/tutorials/tutorial-five-php.html) При желании добавить собственный тип очереди, необходимо создать класс `Publisher` наследующий [AbstractPublisher](Publisher/AbstractPublisher.php) или реализующий [PublisherInterface](Publisher/PublisherInterface.php). diff --git a/Registry/PublisherRegistry.php b/Registry/PublisherRegistry.php index 52bce17..68c884a 100644 --- a/Registry/PublisherRegistry.php +++ b/Registry/PublisherRegistry.php @@ -24,8 +24,8 @@ public function __construct(ServiceProviderInterface $publisherRegistry) */ public function getPublisher(int $queueType): AbstractPublisher { - if ($this->publisherRegistry->has($queueType)) { - return $this->publisherRegistry->get($queueType); + if ($this->publisherRegistry->has((string) $queueType)) { + return $this->publisherRegistry->get((string) $queueType); } throw new PublisherNotFoundException(sprintf('Publisher for queue type "%s" not found', $queueType)); diff --git a/Tests/Publisher/DeduplicateDelayPublisherTest.php b/Tests/Publisher/DeduplicateDelayPublisherTest.php index 8c6acb3..8eb170a 100644 --- a/Tests/Publisher/DeduplicateDelayPublisherTest.php +++ b/Tests/Publisher/DeduplicateDelayPublisherTest.php @@ -35,6 +35,24 @@ public function testPublish(): void self::assertTrue(true); } + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, '') + ; + + $publisher = new DeduplicateDelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS); + + self::assertTrue(true); + } + /** * @dataProvider invalidOptionsProvider */ diff --git a/Tests/Publisher/DeduplicatePublisherTest.php b/Tests/Publisher/DeduplicatePublisherTest.php index 301571b..3b6498c 100644 --- a/Tests/Publisher/DeduplicatePublisherTest.php +++ b/Tests/Publisher/DeduplicatePublisherTest.php @@ -35,27 +35,21 @@ public function testPublish(): void self::assertTrue(true); } - /** - * @dataProvider invalidOptionsProvider - */ - public function testPublishInvalidOptions(array $options): void + public function testPublishWithRouting(): void { - $this->expectException(RabbitQueueException::class); - - $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_QUEUE_NAME, self::QUEUE_TYPE); + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); $hydratorRegistry = $this->createHydratorRegistryMock(); + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), '', self::TEST_ROUTING) + ; $publisher = new DeduplicatePublisher($client, $hydratorRegistry, JsonHydrator::KEY); - $publisher->publish($definition, self::TEST_MESSAGE, $options); - } + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS, self::TEST_ROUTING); - public function invalidOptionsProvider(): array - { - return [ - 'empty options' => [[]], - 'invalid key option' => [['key' => 1]], - ]; + self::assertTrue(true); } } diff --git a/Tests/Publisher/DelayPublisherTest.php b/Tests/Publisher/DelayPublisherTest.php index a3439f0..972f985 100644 --- a/Tests/Publisher/DelayPublisherTest.php +++ b/Tests/Publisher/DelayPublisherTest.php @@ -35,6 +35,24 @@ public function testPublish(): void self::assertTrue(true); } + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, self::TEST_ROUTING) + ; + + $publisher = new DelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS, self::TEST_ROUTING); + + self::assertTrue(true); + } + /** * @dataProvider invalidOptionsProvider */ diff --git a/Tests/Publisher/FifoPublisherTest.php b/Tests/Publisher/FifoPublisherTest.php index acf0a96..a852a13 100644 --- a/Tests/Publisher/FifoPublisherTest.php +++ b/Tests/Publisher/FifoPublisherTest.php @@ -32,4 +32,22 @@ public function testPublish(): void self::assertTrue(true); } + + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), '', self::TEST_ROUTING) + ; + + $publisher = new FifoPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, [], self::TEST_ROUTING); + + self::assertTrue(true); + } } diff --git a/Tests/Publisher/RouterPublisherTest.php b/Tests/Publisher/RouterPublisherTest.php new file mode 100644 index 0000000..23510b8 --- /dev/null +++ b/Tests/Publisher/RouterPublisherTest.php @@ -0,0 +1,54 @@ +createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, self::TEST_QUEUE_NAME) + ; + + $publisher = new RouterPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE); + + self::assertTrue(true); + } + + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, self::TEST_ROUTING) + ; + + $publisher = new RouterPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, [], self::TEST_ROUTING); + + self::assertTrue(true); + } +} diff --git a/Tests/TestCase/AbstractTestCase.php b/Tests/TestCase/AbstractTestCase.php index 1779384..b04325d 100644 --- a/Tests/TestCase/AbstractTestCase.php +++ b/Tests/TestCase/AbstractTestCase.php @@ -15,6 +15,7 @@ class AbstractTestCase extends TestCase protected const TEST_MESSAGE = '{"test": "test"}'; protected const TEST_EXCHANGE = 'test_exchange'; protected const TEST_QUEUE_NAME = 'test_queue'; + protected const TEST_ROUTING = 'test.routing'; public function createDefinitionMock(string $queueName, string $entryPointName, int $queueType): DefinitionInterface { From 3fc0338251ed6ed9a4218207935cde25271724c4 Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Thu, 13 May 2021 13:38:47 +0300 Subject: [PATCH 2/8] HUB-406 changelog, readme --- CHANGELOG.md | 2 + README.md | 154 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 155 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1935d4a..0a3916a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## [Unreleased] +### Added +- `RouterPublisher` added for using `direct` and `topic` RabbitMQ exchanges with routing key. ## [2.1.0] - 2021-05-06 ### Added diff --git a/README.md b/README.md index 1d3ca42..f0aaef2 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,8 @@ Rabbit Queue Bundle - [Шаг 2: Создание consumer'а](#шаг-2-создание-consumerа) - [Шаг 3: Загрузка схем очередей RabbitMQ](#шаг-3-загрузка-схем-очередей-rabbitmq) - [Шаг 4: Запуск consumer'а](#шаг-4-запуск-consumerа) -7. [Лицензия](#лицензия) +7. [Использование `RouterPublisher`](#использование-routerpublisher) +8. [Лицензия](#лицензия) Требования --------- @@ -498,6 +499,157 @@ php bin/console rabbit:consumer:run example Для просмотра списка всех зарегистрированных `consumer`'ов достаточно выполнить команду `rabbit:consumer:list`. +Использование `RouterPublisher` +-------- + +`RouterPublisher` следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать +в сразу в некоторое подмножество, определяемое по `routingKey` сообщения. Для таких целей нужно создать `Definition`, +в котором будет определена только `exchange` типа `direct` или `topic`. Эта `Definition` будет использоваться в качестве +точки входя для сообщений. После этого нужно создать по одной `Definition` на каждую очередь, и все их биндить на первую +`Definition`. + +### Пример `Definition` с `exchange`: +```php +channel(); + + $channel->exchange_declare( + self::QUEUE_NAME, + 'topic', + false, + true, + ); + } + + /** + * {@inheritDoc} + */ + public function getEntryPointName(): string + { + return self::ENTRY_POINT; + } + + /** + * {@inheritDoc} + */ + public function getQueueType(): int + { + return QueueTypeEnum::ROUTER; + } + + /** + * {@inheritDoc} + */ + public static function getQueueName(): string + { + return self::QUEUE_NAME; + } +} +``` + +### Пример `Definition` для очереди +```php +channel(); + + $channel->queue_declare( + self::QUEUE_NAME, + false, + true, + false, + false + ); + + foreach (self::ROUTING as $route) { + $channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition + } + } + + /** + * {@inheritDoc} + */ + public function getEntryPointName(): string + { + return self::ENTRY_POINT; + } + + /** + * {@inheritDoc} + */ + public function getQueueType(): int + { + return QueueTypeEnum::FIFO; + } + + /** + * {@inheritDoc} + */ + public static function getQueueName(): string + { + return self::QUEUE_NAME; + } +} +``` + +После определения биржи и очередей отправка сообщений будет выглядеть как и раньше, но сообщения будут попадать в +очереди только при подходящем routingKey (четвертый параметр в методе put()). + +```php + 'example']; # Сообщение +$options = []; + +/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */ +$producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*' +$producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#' +$producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера +``` + +**Важно!!! Длиина routeKey не должна превышать 255 символов** + Лицензия -------- From db9b24d1d8e17665a4190722a786d4e0b242617e Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Mon, 17 May 2021 18:53:10 +0300 Subject: [PATCH 3/8] HUB-406 unique retryExchange for each queue. Not create retryExchange for ROUTER queue(exchange) type --- Command/UpdateDefinitionCommand.php | 10 ++++++++-- Enum/ExchangeEnum.php | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Command/UpdateDefinitionCommand.php b/Command/UpdateDefinitionCommand.php index 675ff08..e96f4a6 100644 --- a/Command/UpdateDefinitionCommand.php +++ b/Command/UpdateDefinitionCommand.php @@ -12,6 +12,7 @@ use Symfony\Component\Console\Output\OutputInterface; use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum; +use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; class UpdateDefinitionCommand extends Command { @@ -68,11 +69,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int private function bindRetryExchange(DefinitionInterface $definition): void { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER > 0) { + return; + } + $queueName = $definition::getQueueName(); $channel = $this->connection->channel(); + $retryExchange = $queueName . ExchangeEnum::RETRY_EXCHANGE; $channel->exchange_declare( - ExchangeEnum::RETRY_EXCHANGE, + $retryExchange, 'x-delayed-message', false, true, @@ -82,6 +88,6 @@ private function bindRetryExchange(DefinitionInterface $definition): void new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT]) ); - $channel->queue_bind($queueName, ExchangeEnum::RETRY_EXCHANGE, $queueName); + $channel->queue_bind($queueName, $retryExchange, $queueName); } } diff --git a/Enum/ExchangeEnum.php b/Enum/ExchangeEnum.php index da49542..44cf6e6 100644 --- a/Enum/ExchangeEnum.php +++ b/Enum/ExchangeEnum.php @@ -6,5 +6,5 @@ class ExchangeEnum { - public const RETRY_EXCHANGE = 'retry@exchange_delay'; + public const RETRY_EXCHANGE = '@retry_exchange'; } From 030000a4d85b8fd5c3cc10a942158339fc361333 Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Mon, 17 May 2021 20:39:05 +0300 Subject: [PATCH 4/8] HUB-406 router - dependsOn() --- Command/UpdateDefinitionCommand.php | 43 ++++++++++++++++++++++++--- Exception/RouteStructureException.php | 9 ++++++ Resources/config/services.yaml | 5 ++++ 3 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 Exception/RouteStructureException.php diff --git a/Command/UpdateDefinitionCommand.php b/Command/UpdateDefinitionCommand.php index e96f4a6..4ae2eff 100644 --- a/Command/UpdateDefinitionCommand.php +++ b/Command/UpdateDefinitionCommand.php @@ -13,6 +13,7 @@ use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\RouteStructureException; class UpdateDefinitionCommand extends Command { @@ -55,10 +56,48 @@ protected function configure(): void /** * {@inheritDoc} + * + * @throws RouteStructureException */ protected function execute(InputInterface $input, OutputInterface $output): int { + $routersToInit = []; + $initializedRouters = []; + + foreach ($this->definitionList as $definition) { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER > 0) { + if (method_exists($definition, 'dependsOn') && !empty($definition->dependsOn())) { + $routersToInit[$definition::getQueueName()] = $definition; + } else { + $definition->init($this->connection); + $initializedRouters[] = $definition::getQueueName(); + } + } + } + + $successLoop = true; + while ($successLoop && !empty($routersToInit)) { + $successLoop = false; + + foreach ($routersToInit as $router) { + if (empty(array_diff($router->dependsOn(), $initializedRouters))) { + $successLoop = true; + $router->init($this->connection); + $initializedRouters[] = $router::getQueueName(); + } + } + } + + if (!$successLoop) { + throw new RouteStructureException('Router definitions have cyclic dependencies'); + } + + foreach ($this->definitionList as $definition) { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER === 0) { + continue; + } + $definition->init($this->connection); $this->bindRetryExchange($definition); @@ -69,10 +108,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int private function bindRetryExchange(DefinitionInterface $definition): void { - if ($definition->getQueueType() & QueueTypeEnum::ROUTER > 0) { - return; - } - $queueName = $definition::getQueueName(); $channel = $this->connection->channel(); $retryExchange = $queueName . ExchangeEnum::RETRY_EXCHANGE; diff --git a/Exception/RouteStructureException.php b/Exception/RouteStructureException.php new file mode 100644 index 0000000..db5e0bf --- /dev/null +++ b/Exception/RouteStructureException.php @@ -0,0 +1,9 @@ + Date: Mon, 17 May 2021 21:16:10 +0300 Subject: [PATCH 5/8] HUB-406 dependsOn() loop bugfix --- Command/UpdateDefinitionCommand.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Command/UpdateDefinitionCommand.php b/Command/UpdateDefinitionCommand.php index 4ae2eff..056cc0a 100644 --- a/Command/UpdateDefinitionCommand.php +++ b/Command/UpdateDefinitionCommand.php @@ -65,7 +65,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $initializedRouters = []; foreach ($this->definitionList as $definition) { - if ($definition->getQueueType() & QueueTypeEnum::ROUTER > 0) { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER) { if (method_exists($definition, 'dependsOn') && !empty($definition->dependsOn())) { $routersToInit[$definition::getQueueName()] = $definition; } else { @@ -83,6 +83,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int if (empty(array_diff($router->dependsOn(), $initializedRouters))) { $successLoop = true; $router->init($this->connection); + unset($routersToInit[$router::getQueueName()]); $initializedRouters[] = $router::getQueueName(); } } @@ -94,7 +95,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int foreach ($this->definitionList as $definition) { - if ($definition->getQueueType() & QueueTypeEnum::ROUTER === 0) { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER) { continue; } From 328fad64e00774a576b7af4d10c98c67e61b2caf Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Tue, 18 May 2021 16:57:58 +0300 Subject: [PATCH 6/8] HUB-406 revert --- Enum/ExchangeEnum.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Enum/ExchangeEnum.php b/Enum/ExchangeEnum.php index 44cf6e6..da49542 100644 --- a/Enum/ExchangeEnum.php +++ b/Enum/ExchangeEnum.php @@ -6,5 +6,5 @@ class ExchangeEnum { - public const RETRY_EXCHANGE = '@retry_exchange'; + public const RETRY_EXCHANGE = 'retry@exchange_delay'; } From b1872a0ac5f860d8f51ca2c68bf386a1a468ce55 Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Tue, 18 May 2021 16:58:26 +0300 Subject: [PATCH 7/8] HUB-406 revert retry_exchange --- Command/UpdateDefinitionCommand.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Command/UpdateDefinitionCommand.php b/Command/UpdateDefinitionCommand.php index 056cc0a..9117293 100644 --- a/Command/UpdateDefinitionCommand.php +++ b/Command/UpdateDefinitionCommand.php @@ -111,10 +111,9 @@ private function bindRetryExchange(DefinitionInterface $definition): void { $queueName = $definition::getQueueName(); $channel = $this->connection->channel(); - $retryExchange = $queueName . ExchangeEnum::RETRY_EXCHANGE; $channel->exchange_declare( - $retryExchange, + ExchangeEnum::RETRY_EXCHANGE, 'x-delayed-message', false, true, @@ -124,6 +123,6 @@ private function bindRetryExchange(DefinitionInterface $definition): void new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT]) ); - $channel->queue_bind($queueName, $retryExchange, $queueName); + $channel->queue_bind($queueName, ExchangeEnum::RETRY_EXCHANGE, $queueName); } } From 758cf36b16c07534615abdc8d79262f3d16ea23e Mon Sep 17 00:00:00 2001 From: Artem Virskiy Date: Tue, 18 May 2021 17:52:42 +0300 Subject: [PATCH 8/8] HUB-406 doc upd --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index f0aaef2..8e8be1d 100644 --- a/README.md +++ b/README.md @@ -503,10 +503,11 @@ php bin/console rabbit:consumer:run example -------- `RouterPublisher` следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать -в сразу в некоторое подмножество, определяемое по `routingKey` сообщения. Для таких целей нужно создать `Definition`, -в котором будет определена только `exchange` типа `direct` или `topic`. Эта `Definition` будет использоваться в качестве -точки входя для сообщений. После этого нужно создать по одной `Definition` на каждую очередь, и все их биндить на первую -`Definition`. +сразу в некоторое их подмножество, определяемое по `routingKey` сообщения. Для таких целей нужно создать `Definition`, +в котором будет определена только `exchange` типа `direct`, `topic` или `fanout`. Эта `Definition` будет использоваться +в качестве точки входя для сообщений. После этого нужно создать по одной `Definition` на каждую очередь, и все их +биндить на первую `Definition`. Можно создать сложную маршрутизацию, если вместо очередей создавать и биндить +`Definition` типа первой. ### Пример `Definition` с `exchange`: ```php @@ -648,7 +649,7 @@ $producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // $producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера ``` -**Важно!!! Длиина routeKey не должна превышать 255 символов** +**Важно!!! Длина routeKey не должна превышать 255 символов** Лицензия --------