diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs index 2a872ced..b0cc8100 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs @@ -1,9 +1,11 @@ -namespace SlimMessageBus.Host.Mqtt; - +using System.Collections.Generic; +using System.Text.RegularExpressions; +using System.Threading; using Microsoft.Extensions.DependencyInjection; - using MQTTnet.Extensions.ManagedClient; +namespace SlimMessageBus.Host.Mqtt; + public class MqttMessageBus : MessageBusBase { private readonly ILogger _logger; @@ -101,9 +103,18 @@ protected override async Task DestroyConsumers() } } + private static bool CheckTopic(string allowedTopic, string topic) + { + if (string.Equals(allowedTopic, topic)) + return true; + var realTopicRegex = allowedTopic.Replace(@"/", @"\/").Replace("+", @"[a-zA-Z0-9 _.-]*").Replace("#", @"[a-zA-Z0-9 \/_#+.-]*"); + var regex = new Regex(realTopicRegex); + return regex.IsMatch(topic); + } + private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { - var consumer = Consumers.Cast().FirstOrDefault(x => x.Path == arg.ApplicationMessage.Topic); + var consumer = Consumers.Cast().FirstOrDefault(x => CheckTopic(x.Path, arg.ApplicationMessage.Topic)); if (consumer != null) { var headers = new Dictionary(); diff --git a/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs index dfcd650e..f27556b1 100644 --- a/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs @@ -55,6 +55,26 @@ public async Task BasicPubSubOnTopic(bool bulkProduce) await BasicPubSub(1, bulkProduce: bulkProduce); } + [Theory] + [InlineData("test-ping/+/first", "test-ping/test/first", 1)] + [InlineData("test-ping/+/first", "test-ping/test/first/first", 0)] + [InlineData("test-ping/+/first", "test-ping/first/test", 0)] + [InlineData("test-ping/#", "test-ping/test/first", 1)] + [InlineData("test-ping/#", "test-ping/test/first/first", 1)] + public async Task WildCardSubOnTopic(string wildCardTopic, string topic, int expected) + { + var concurrency = 2; + + AddBusConfiguration(mbb => + { + mbb + .Produce(x => x.DefaultTopic(topic)) + .Consume(x => x.Topic(wildCardTopic).Instances(concurrency)); + }); + + await BasicPubSub(expected, false); + } + private async Task BasicPubSub(int expectedMessageCopies, bool bulkProduce) { // arrange