diff --git a/README.md b/README.md index 47f57c2..84f18b6 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,8 @@ The following code is an example to show the basic usage of StompNet. "admin", "password")) { - // Create a connection. - IStompConnection connection = await stompConnector.ConnectAsync(); + // Create a connection using an hearbeat. + IStompConnection connection = await stompConnector.ConnectAsync(heartbeat: new Heartbeat(30000, 30000)); // Send a message. await connection.SendAsync("/queue/example", "Anybody there!?"); @@ -122,7 +122,6 @@ you can see how to: Apache License 2.0 ## ToDo -- Heartbeat support - STOMP 1.1 support [reactive-extensions]: http://msdn.microsoft.com/en-us/data/gg577609.aspx diff --git a/StompNet.Examples/1.ExampleConnector.cs b/StompNet.Examples/1.ExampleConnector.cs index aeef409..49393c1 100644 --- a/StompNet.Examples/1.ExampleConnector.cs +++ b/StompNet.Examples/1.ExampleConnector.cs @@ -24,6 +24,7 @@ using System.Threading.Tasks; using StompNet; using StompNet.Models; +using StompNet.Models.Frames; namespace Stomp.Net.Examples { @@ -58,7 +59,7 @@ public static async Task ExampleConnector() using (IStompConnector stompConnector = new Stomp12Connector(tcpClient.GetStream(), virtualHost, login, passcode)) { //Connect to the STOMP service. - IStompConnection connection = await stompConnector.ConnectAsync(); + IStompConnection connection = await stompConnector.ConnectAsync(heartbeat: new Heartbeat(30000, 30000)); // Send a couple of messages with string content. for (int i = 1; i <= 2; i++) diff --git a/StompNet/IO/Stomp12FrameWriter.cs b/StompNet/IO/Stomp12FrameWriter.cs index ba3d652..e0a203c 100644 --- a/StompNet/IO/Stomp12FrameWriter.cs +++ b/StompNet/IO/Stomp12FrameWriter.cs @@ -16,7 +16,6 @@ * *******************************************************************************/ -using System; using System.IO; using System.Linq; using System.Text; @@ -85,6 +84,10 @@ private async Task WriteFrameAsyncImpl(Frame frame, CancellationToken cancellati await _writer.WriteAsync(frame.BodyArray, cancellationToken); await _writer.WriteAsync(StompOctets.EndOfFrameByte, cancellationToken); } + else if (frame.Command == StompCommands.Heartbeat) + { + await _writer.WriteAsync(StompOctets.LineFeedByte, cancellationToken); + } else { await _writer.WriteAsync(frame.Command, cancellationToken); diff --git a/StompNet/IO/StompClient.cs b/StompNet/IO/StompClient.cs index 27ce030..017ed5f 100644 --- a/StompNet/IO/StompClient.cs +++ b/StompNet/IO/StompClient.cs @@ -41,6 +41,7 @@ public class StompClient : IStompClient private readonly IStompFrameObservable _frameObservable; private readonly IStompFrameWriter _frameWriter; + private readonly StompHeartbeatManager _heartbeatManager; private readonly ISequenceNumberGenerator _receiptNumberGenerator; private readonly ISequenceNumberGenerator _subscriptionNumberGenerator; private readonly ISequenceNumberGenerator _transactionNumberGenerator; @@ -71,6 +72,7 @@ internal StompClient( _frameObservable = new StompFrameObservable(reader); _frameWriter = new StompFrameWriterWithConfirmation(writer, _frameObservable, retryInterval); + _heartbeatManager = new StompHeartbeatManager(this, _frameObservable); if (!useRandomNumberGenerator) { @@ -131,6 +133,7 @@ protected virtual void Dispose(bool disposing) if(_disposed) return; _frameWriter.Dispose(); + _heartbeatManager.Dispose(); if(_cascadeDispose) { _reader.Dispose(); diff --git a/StompNet/IO/StompFrameWriterExtensions.cs b/StompNet/IO/StompFrameWriterExtensions.cs index 525f001..90257ba 100644 --- a/StompNet/IO/StompFrameWriterExtensions.cs +++ b/StompNet/IO/StompFrameWriterExtensions.cs @@ -187,5 +187,11 @@ public static Task WriteDisconnectAsync( Frame frame = StompFrameFactory.CreateDisconnect(receipt, extraHeaders); return writer.WriteAsync(frame, cancelToken); } + + public static Task WriteHearbeatAsync(this IStompFrameWriter writer, CancellationToken? cancellationToken = null) + { + CancellationToken cancelToken = cancellationToken ?? CancellationToken.None; + return writer.WriteAsync(Frame.HeartbeatFrame, cancelToken); + } } } diff --git a/StompNet/IO/StompHeartbeatManager.cs b/StompNet/IO/StompHeartbeatManager.cs new file mode 100644 index 0000000..5ddce47 --- /dev/null +++ b/StompNet/IO/StompHeartbeatManager.cs @@ -0,0 +1,125 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using StompNet.Helpers; +using StompNet.Models; +using StompNet.Models.Frames; + +namespace StompNet.IO +{ + /// + /// Observer to manage hearbeat + /// + public class StompHeartbeatManager : IDisposable + { + + private IStompClient client; + private IStompFrameObservable frameObservable; + + private CancellationTokenSource keepAliveToken; + private CancellationTokenSource sendHBToken; + private bool shouldStartSendingHeartbeat; + + /// + /// Gets the last activity time (received message) + /// + public DateTime LastActivityTime { get; private set; } = DateTime.UtcNow; + + /// + /// Gets or sets margin of acceptance to send and receive heartbeat + /// because of timing inaccuracies, the receiver SHOULD be tolerant and take into account an error margin + /// + public static int MarginOfAcceptance { get; set; } = 1000; + + /// + /// Create an observer to + /// + /// + /// + public StompHeartbeatManager(IStompClient client, IStompFrameObservable frameObservable) + { + this.client = client; + this.frameObservable = frameObservable; + + if (this.frameObservable == null) + throw new ArgumentNullException("frameObservable"); + + this.frameObservable.SubscribeEx(this.OnNext); + } + + private void OnNext(Frame frame) + { + if (frame.Command == StompCommands.Connected) + { + var connectedFrame = StompInterpreter.Interpret(frame) as ConnectedFrame; + if (connectedFrame.Heartbeat.Incoming > 0) + { + CheckKeepAlive(connectedFrame.Heartbeat.Incoming + MarginOfAcceptance); + } + + if (connectedFrame.Heartbeat.Outgoing > 0) + { + SendHeartBeat((int)(Math.Max(connectedFrame.Heartbeat.Outgoing - MarginOfAcceptance, MarginOfAcceptance))); + } + } + + LastActivityTime = DateTime.UtcNow; + } + + /// + /// Checks if expected server heartbeat is received + /// + /// + private async void CheckKeepAlive(int interval) + { + keepAliveToken = new CancellationTokenSource(); + while (!keepAliveToken.IsCancellationRequested) + { + var timeToWait = interval - (DateTime.UtcNow - LastActivityTime).TotalMilliseconds; + if (timeToWait < 0) + { + client.Dispose(); + } + else + { + await Task.Delay((int)timeToWait, keepAliveToken.Token).ContinueWith(t => { }); + } + } + } + + /// + /// Sends heartbeat periodically + /// + /// + private async void SendHeartBeat(int interval) + { + sendHBToken = new CancellationTokenSource(); + while (!sendHBToken.IsCancellationRequested) + { + if (shouldStartSendingHeartbeat) + { + await client.WriteHearbeatAsync(); + } + else + { + shouldStartSendingHeartbeat = true; + } + await Task.Delay(interval, sendHBToken.Token) + .ContinueWith(t => { }); ; + } + } + + public void Dispose() + { + if (keepAliveToken != null && !keepAliveToken.IsCancellationRequested) + { + keepAliveToken.Cancel(); + } + + if (sendHBToken != null && !sendHBToken.IsCancellationRequested) + { + sendHBToken.Cancel(); + } + } + } +} diff --git a/StompNet/IStompConnector.cs b/StompNet/IStompConnector.cs index d13efd9..1af636a 100644 --- a/StompNet/IStompConnector.cs +++ b/StompNet/IStompConnector.cs @@ -20,6 +20,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using StompNet.Models.Frames; namespace StompNet { @@ -36,7 +37,8 @@ public interface IStompConnector : IDisposable /// The token to monitor for cancellation requests. /// A task representing the connect operation. Task ConnectAsync( - IEnumerable> extraHeaders = null, + IEnumerable> extraHeaders = null, + Heartbeat heartbeat = null, CancellationToken? cancellationToken = null); } } \ No newline at end of file diff --git a/StompNet/Stomp12Connector.cs b/StompNet/Stomp12Connector.cs index 774f435..52102f0 100644 --- a/StompNet/Stomp12Connector.cs +++ b/StompNet/Stomp12Connector.cs @@ -104,11 +104,12 @@ public Stomp12Connector( /// A task representing the connect operation. public async Task ConnectAsync( IEnumerable> extraHeaders = null, + Heartbeat heartbeat = null, CancellationToken? cancellationToken = null) { _client.Start(_cts.Token); - await _client.WriteConnectAsync(_host, _user, _password, Heartbeat.NoHeartbeat, extraHeaders, cancellationToken); + await _client.WriteConnectAsync(_host, _user, _password, heartbeat ?? Heartbeat.NoHeartbeat, extraHeaders, cancellationToken); return new StompConnection(_client); } diff --git a/StompNet/StompNet.csproj b/StompNet/StompNet.csproj index 62172af..882f34a 100644 --- a/StompNet/StompNet.csproj +++ b/StompNet/StompNet.csproj @@ -65,6 +65,7 @@ +