Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
namespace SlimMessageBus.Host.Mqtt;

using System.Collections.Generic;
using System.Text.RegularExpressions;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;

using MQTTnet.Extensions.ManagedClient;

namespace SlimMessageBus.Host.Mqtt;

public class MqttMessageBus : MessageBusBase<MqttMessageBusSettings>
{
private readonly ILogger _logger;
Expand Down Expand Up @@ -101,9 +103,18 @@
}
}

private static bool CheckTopic(string allowedTopic, string topic)
{
if (string.Equals(allowedTopic, topic))
return true;
var realTopicRegex = allowedTopic.Replace(@"/", @"\/").Replace("+", @"[a-zA-Z0-9 _.-]*").Replace("#", @"[a-zA-Z0-9 \/_#+.-]*");
var regex = new Regex(realTopicRegex);

Check warning on line 111 in src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs

View workflow job for this annotation

GitHub Actions / build

Pass a timeout to limit the execution time. (https://rules.sonarsource.com/csharp/RSPEC-6444)

Check warning on line 111 in src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs

View workflow job for this annotation

GitHub Actions / build

Pass a timeout to limit the execution time. (https://rules.sonarsource.com/csharp/RSPEC-6444)
return regex.IsMatch(topic);
}

private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
var consumer = Consumers.Cast<MqttTopicConsumer>().FirstOrDefault(x => x.Path == arg.ApplicationMessage.Topic);
var consumer = Consumers.Cast<MqttTopicConsumer>().FirstOrDefault(x => CheckTopic(x.Path, arg.ApplicationMessage.Topic));
if (consumer != null)
{
var headers = new Dictionary<string, object>();
Expand Down
20 changes: 20 additions & 0 deletions src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@
await BasicPubSub(1, bulkProduce: bulkProduce);
}

[Theory]
[InlineData("test-ping/+/first", "test-ping/test/first", 1)]
[InlineData("test-ping/+/first", "test-ping/test/first/first", 0)]
[InlineData("test-ping/+/first", "test-ping/first/test", 0)]
[InlineData("test-ping/#", "test-ping/test/first", 1)]
[InlineData("test-ping/#", "test-ping/test/first/first", 1)]
public async Task WildCardSubOnTopic(string wildCardTopic, string topic, int expected)
{
var concurrency = 2;

AddBusConfiguration(mbb =>
{
mbb
.Produce<PingMessage>(x => x.DefaultTopic(topic))
.Consume<PingMessage>(x => x.Topic(wildCardTopic).Instances(concurrency));
});

await BasicPubSub(expected, false);
}

private async Task BasicPubSub(int expectedMessageCopies, bool bulkProduce)
{
// arrange
Expand Down Expand Up @@ -157,7 +177,7 @@
var responses = new ConcurrentBag<(EchoRequest Request, EchoResponse Response)>();
var responseTasks = requests.Select(async req =>
{
var resp = await messageBus.Send<EchoResponse, EchoRequest>(req);

Check failure on line 180 in src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs

View workflow job for this annotation

GitHub Actions / .NET Tests

SlimMessageBus.Host.Mqtt.Test.MqttMessageBusIt ► BasicReqRespOnTopic

Failed test found in: ./test-results/SlimMessageBus.Host.Mqtt.Test_Integration_net9.0_20250901143622.trx Error: System.Threading.Tasks.TaskCanceledException : A task was canceled.
Raw output
System.Threading.Tasks.TaskCanceledException : A task was canceled.
   at SlimMessageBus.Host.MessageBusBase.SendInternal[TResponseMessage](Object request, String path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, String requestId, IDictionary`2 requestHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/MessageBusBase.cs:line 610
   at SlimMessageBus.Host.MessageBusBase.ProduceSend[TResponse](Object request, String path, IDictionary`2 headers, Nullable`1 timeout, IMessageBusTarget targetBus, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host/MessageBusBase.cs:line 572
   at SlimMessageBus.Host.Mqtt.Test.MqttMessageBusIt.<>c__DisplayClass9_0.<<BasicReqResp>b__1>d.MoveNext() in /home/runner/work/SlimMessageBus/SlimMessageBus/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs:line 180
--- End of stack trace from previous location ---
   at SlimMessageBus.Host.Mqtt.Test.MqttMessageBusIt.BasicReqResp() in /home/runner/work/SlimMessageBus/SlimMessageBus/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs:line 184
   at SlimMessageBus.Host.Mqtt.Test.MqttMessageBusIt.BasicReqRespOnTopic() in /home/runner/work/SlimMessageBus/SlimMessageBus/src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs:line 156
--- End of stack trace from previous location ---
Logger.LogDebug("Received response for index {EchoIndex:000}", req.Index);
responses.Add((req, resp));
});
Expand Down
Loading