diff --git a/README.md b/README.md
index 47f57c2..84f18b6 100644
--- a/README.md
+++ b/README.md
@@ -37,8 +37,8 @@ The following code is an example to show the basic usage of StompNet.
"admin",
"password"))
{
- // Create a connection.
- IStompConnection connection = await stompConnector.ConnectAsync();
+ // Create a connection using an hearbeat.
+ IStompConnection connection = await stompConnector.ConnectAsync(heartbeat: new Heartbeat(30000, 30000));
// Send a message.
await connection.SendAsync("/queue/example", "Anybody there!?");
@@ -122,7 +122,6 @@ you can see how to:
Apache License 2.0
## ToDo
-- Heartbeat support
- STOMP 1.1 support
[reactive-extensions]: http://msdn.microsoft.com/en-us/data/gg577609.aspx
diff --git a/StompNet.Examples/1.ExampleConnector.cs b/StompNet.Examples/1.ExampleConnector.cs
index aeef409..49393c1 100644
--- a/StompNet.Examples/1.ExampleConnector.cs
+++ b/StompNet.Examples/1.ExampleConnector.cs
@@ -24,6 +24,7 @@
using System.Threading.Tasks;
using StompNet;
using StompNet.Models;
+using StompNet.Models.Frames;
namespace Stomp.Net.Examples
{
@@ -58,7 +59,7 @@ public static async Task ExampleConnector()
using (IStompConnector stompConnector = new Stomp12Connector(tcpClient.GetStream(), virtualHost, login, passcode))
{
//Connect to the STOMP service.
- IStompConnection connection = await stompConnector.ConnectAsync();
+ IStompConnection connection = await stompConnector.ConnectAsync(heartbeat: new Heartbeat(30000, 30000));
// Send a couple of messages with string content.
for (int i = 1; i <= 2; i++)
diff --git a/StompNet/IO/Stomp12FrameWriter.cs b/StompNet/IO/Stomp12FrameWriter.cs
index ba3d652..e0a203c 100644
--- a/StompNet/IO/Stomp12FrameWriter.cs
+++ b/StompNet/IO/Stomp12FrameWriter.cs
@@ -16,7 +16,6 @@
*
*******************************************************************************/
-using System;
using System.IO;
using System.Linq;
using System.Text;
@@ -85,6 +84,10 @@ private async Task WriteFrameAsyncImpl(Frame frame, CancellationToken cancellati
await _writer.WriteAsync(frame.BodyArray, cancellationToken);
await _writer.WriteAsync(StompOctets.EndOfFrameByte, cancellationToken);
}
+ else if (frame.Command == StompCommands.Heartbeat)
+ {
+ await _writer.WriteAsync(StompOctets.LineFeedByte, cancellationToken);
+ }
else
{
await _writer.WriteAsync(frame.Command, cancellationToken);
diff --git a/StompNet/IO/StompClient.cs b/StompNet/IO/StompClient.cs
index 27ce030..017ed5f 100644
--- a/StompNet/IO/StompClient.cs
+++ b/StompNet/IO/StompClient.cs
@@ -41,6 +41,7 @@ public class StompClient : IStompClient
private readonly IStompFrameObservable _frameObservable;
private readonly IStompFrameWriter _frameWriter;
+ private readonly StompHeartbeatManager _heartbeatManager;
private readonly ISequenceNumberGenerator _receiptNumberGenerator;
private readonly ISequenceNumberGenerator _subscriptionNumberGenerator;
private readonly ISequenceNumberGenerator _transactionNumberGenerator;
@@ -71,6 +72,7 @@ internal StompClient(
_frameObservable = new StompFrameObservable(reader);
_frameWriter = new StompFrameWriterWithConfirmation(writer, _frameObservable, retryInterval);
+ _heartbeatManager = new StompHeartbeatManager(this, _frameObservable);
if (!useRandomNumberGenerator)
{
@@ -131,6 +133,7 @@ protected virtual void Dispose(bool disposing)
if(_disposed) return;
_frameWriter.Dispose();
+ _heartbeatManager.Dispose();
if(_cascadeDispose)
{
_reader.Dispose();
diff --git a/StompNet/IO/StompFrameWriterExtensions.cs b/StompNet/IO/StompFrameWriterExtensions.cs
index 525f001..90257ba 100644
--- a/StompNet/IO/StompFrameWriterExtensions.cs
+++ b/StompNet/IO/StompFrameWriterExtensions.cs
@@ -187,5 +187,11 @@ public static Task WriteDisconnectAsync(
Frame frame = StompFrameFactory.CreateDisconnect(receipt, extraHeaders);
return writer.WriteAsync(frame, cancelToken);
}
+
+ public static Task WriteHearbeatAsync(this IStompFrameWriter writer, CancellationToken? cancellationToken = null)
+ {
+ CancellationToken cancelToken = cancellationToken ?? CancellationToken.None;
+ return writer.WriteAsync(Frame.HeartbeatFrame, cancelToken);
+ }
}
}
diff --git a/StompNet/IO/StompHeartbeatManager.cs b/StompNet/IO/StompHeartbeatManager.cs
new file mode 100644
index 0000000..5ddce47
--- /dev/null
+++ b/StompNet/IO/StompHeartbeatManager.cs
@@ -0,0 +1,125 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using StompNet.Helpers;
+using StompNet.Models;
+using StompNet.Models.Frames;
+
+namespace StompNet.IO
+{
+ ///
+ /// Observer to manage hearbeat
+ ///
+ public class StompHeartbeatManager : IDisposable
+ {
+
+ private IStompClient client;
+ private IStompFrameObservable frameObservable;
+
+ private CancellationTokenSource keepAliveToken;
+ private CancellationTokenSource sendHBToken;
+ private bool shouldStartSendingHeartbeat;
+
+ ///
+ /// Gets the last activity time (received message)
+ ///
+ public DateTime LastActivityTime { get; private set; } = DateTime.UtcNow;
+
+ ///
+ /// Gets or sets margin of acceptance to send and receive heartbeat
+ /// because of timing inaccuracies, the receiver SHOULD be tolerant and take into account an error margin
+ ///
+ public static int MarginOfAcceptance { get; set; } = 1000;
+
+ ///
+ /// Create an observer to
+ ///
+ ///
+ ///
+ public StompHeartbeatManager(IStompClient client, IStompFrameObservable frameObservable)
+ {
+ this.client = client;
+ this.frameObservable = frameObservable;
+
+ if (this.frameObservable == null)
+ throw new ArgumentNullException("frameObservable");
+
+ this.frameObservable.SubscribeEx(this.OnNext);
+ }
+
+ private void OnNext(Frame frame)
+ {
+ if (frame.Command == StompCommands.Connected)
+ {
+ var connectedFrame = StompInterpreter.Interpret(frame) as ConnectedFrame;
+ if (connectedFrame.Heartbeat.Incoming > 0)
+ {
+ CheckKeepAlive(connectedFrame.Heartbeat.Incoming + MarginOfAcceptance);
+ }
+
+ if (connectedFrame.Heartbeat.Outgoing > 0)
+ {
+ SendHeartBeat((int)(Math.Max(connectedFrame.Heartbeat.Outgoing - MarginOfAcceptance, MarginOfAcceptance)));
+ }
+ }
+
+ LastActivityTime = DateTime.UtcNow;
+ }
+
+ ///
+ /// Checks if expected server heartbeat is received
+ ///
+ ///
+ private async void CheckKeepAlive(int interval)
+ {
+ keepAliveToken = new CancellationTokenSource();
+ while (!keepAliveToken.IsCancellationRequested)
+ {
+ var timeToWait = interval - (DateTime.UtcNow - LastActivityTime).TotalMilliseconds;
+ if (timeToWait < 0)
+ {
+ client.Dispose();
+ }
+ else
+ {
+ await Task.Delay((int)timeToWait, keepAliveToken.Token).ContinueWith(t => { });
+ }
+ }
+ }
+
+ ///
+ /// Sends heartbeat periodically
+ ///
+ ///
+ private async void SendHeartBeat(int interval)
+ {
+ sendHBToken = new CancellationTokenSource();
+ while (!sendHBToken.IsCancellationRequested)
+ {
+ if (shouldStartSendingHeartbeat)
+ {
+ await client.WriteHearbeatAsync();
+ }
+ else
+ {
+ shouldStartSendingHeartbeat = true;
+ }
+ await Task.Delay(interval, sendHBToken.Token)
+ .ContinueWith(t => { }); ;
+ }
+ }
+
+ public void Dispose()
+ {
+ if (keepAliveToken != null && !keepAliveToken.IsCancellationRequested)
+ {
+ keepAliveToken.Cancel();
+ }
+
+ if (sendHBToken != null && !sendHBToken.IsCancellationRequested)
+ {
+ sendHBToken.Cancel();
+ }
+ }
+ }
+}
diff --git a/StompNet/IStompConnector.cs b/StompNet/IStompConnector.cs
index d13efd9..1af636a 100644
--- a/StompNet/IStompConnector.cs
+++ b/StompNet/IStompConnector.cs
@@ -20,6 +20,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
+using StompNet.Models.Frames;
namespace StompNet
{
@@ -36,7 +37,8 @@ public interface IStompConnector : IDisposable
/// The token to monitor for cancellation requests.
/// A task representing the connect operation.
Task ConnectAsync(
- IEnumerable> extraHeaders = null,
+ IEnumerable> extraHeaders = null,
+ Heartbeat heartbeat = null,
CancellationToken? cancellationToken = null);
}
}
\ No newline at end of file
diff --git a/StompNet/Stomp12Connector.cs b/StompNet/Stomp12Connector.cs
index 774f435..52102f0 100644
--- a/StompNet/Stomp12Connector.cs
+++ b/StompNet/Stomp12Connector.cs
@@ -104,11 +104,12 @@ public Stomp12Connector(
/// A task representing the connect operation.
public async Task ConnectAsync(
IEnumerable> extraHeaders = null,
+ Heartbeat heartbeat = null,
CancellationToken? cancellationToken = null)
{
_client.Start(_cts.Token);
- await _client.WriteConnectAsync(_host, _user, _password, Heartbeat.NoHeartbeat, extraHeaders, cancellationToken);
+ await _client.WriteConnectAsync(_host, _user, _password, heartbeat ?? Heartbeat.NoHeartbeat, extraHeaders, cancellationToken);
return new StompConnection(_client);
}
diff --git a/StompNet/StompNet.csproj b/StompNet/StompNet.csproj
index 62172af..882f34a 100644
--- a/StompNet/StompNet.csproj
+++ b/StompNet/StompNet.csproj
@@ -65,6 +65,7 @@
+