From 9a733f4f78e5e9520de7b3f870a4e4fa2eca12ed Mon Sep 17 00:00:00 2001 From: Vincent Vandenbosch Date: Wed, 23 Apr 2025 10:00:30 +0200 Subject: [PATCH 1/2] [Host.Mqtt] Add mqtt wildcard support for # and + symbols Signed-off-by: Vincent Vandenbosch --- .../MqttMessageBus.cs | 10 +++++++++- .../MqttMessageBusIt.cs | 20 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs index 4d693467..a124a759 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs @@ -1,6 +1,7 @@ namespace SlimMessageBus.Host.Mqtt; using System.Collections.Generic; +using System.Text.RegularExpressions; using System.Threading; using Microsoft.Extensions.DependencyInjection; @@ -107,9 +108,16 @@ protected override async Task DestroyConsumers() } } + private static bool CheckTopic(string allowedTopic, string topic) + { + var realTopicRegex = allowedTopic.Replace(@"/", @"\/").Replace("+", @"[a-zA-Z0-9 _.-]*").Replace("#", @"[a-zA-Z0-9 \/_#+.-]*"); + var regex = new Regex(realTopicRegex); + return regex.IsMatch(topic); + } + private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { - var consumer = Consumers.Cast().FirstOrDefault(x => x.Path == arg.ApplicationMessage.Topic); + var consumer = Consumers.Cast().FirstOrDefault(x => CheckTopic(x.Path, arg.ApplicationMessage.Topic)); if (consumer != null) { var headers = new Dictionary(); diff --git a/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs index dfcd650e..f27556b1 100644 --- a/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs @@ -55,6 +55,26 @@ public async Task BasicPubSubOnTopic(bool bulkProduce) await BasicPubSub(1, bulkProduce: bulkProduce); } + [Theory] + [InlineData("test-ping/+/first", "test-ping/test/first", 1)] + [InlineData("test-ping/+/first", "test-ping/test/first/first", 0)] + [InlineData("test-ping/+/first", "test-ping/first/test", 0)] + [InlineData("test-ping/#", "test-ping/test/first", 1)] + [InlineData("test-ping/#", "test-ping/test/first/first", 1)] + public async Task WildCardSubOnTopic(string wildCardTopic, string topic, int expected) + { + var concurrency = 2; + + AddBusConfiguration(mbb => + { + mbb + .Produce(x => x.DefaultTopic(topic)) + .Consume(x => x.Topic(wildCardTopic).Instances(concurrency)); + }); + + await BasicPubSub(expected, false); + } + private async Task BasicPubSub(int expectedMessageCopies, bool bulkProduce) { // arrange From 7f45f9849f93ae13892f7446522b94e97b2d8ee2 Mon Sep 17 00:00:00 2001 From: Vincent Vandenbosch Date: Wed, 23 Apr 2025 11:25:41 +0200 Subject: [PATCH 2/2] [Host.Mqtt] First check for equals for mqtt Signed-off-by: Vincent Vandenbosch --- src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs index a124a759..09e59728 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs @@ -110,6 +110,8 @@ protected override async Task DestroyConsumers() private static bool CheckTopic(string allowedTopic, string topic) { + if (string.Equals(allowedTopic, topic)) + return true; var realTopicRegex = allowedTopic.Replace(@"/", @"\/").Replace("+", @"[a-zA-Z0-9 _.-]*").Replace("#", @"[a-zA-Z0-9 \/_#+.-]*"); var regex = new Regex(realTopicRegex); return regex.IsMatch(topic);