Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ The following code is an example to show the basic usage of StompNet.
"admin",
"password"))
{
// Create a connection.
IStompConnection connection = await stompConnector.ConnectAsync();
// Create a connection using an hearbeat.
IStompConnection connection = await stompConnector.ConnectAsync(heartbeat: new Heartbeat(30000, 30000));

// Send a message.
await connection.SendAsync("/queue/example", "Anybody there!?");
Expand Down Expand Up @@ -122,7 +122,6 @@ you can see how to:
Apache License 2.0

## ToDo
- Heartbeat support
- STOMP 1.1 support

[reactive-extensions]: http://msdn.microsoft.com/en-us/data/gg577609.aspx
Expand Down
3 changes: 2 additions & 1 deletion StompNet.Examples/1.ExampleConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using System.Threading.Tasks;
using StompNet;
using StompNet.Models;
using StompNet.Models.Frames;

namespace Stomp.Net.Examples
{
Expand Down Expand Up @@ -58,7 +59,7 @@ public static async Task ExampleConnector()
using (IStompConnector stompConnector = new Stomp12Connector(tcpClient.GetStream(), virtualHost, login, passcode))
{
//Connect to the STOMP service.
IStompConnection connection = await stompConnector.ConnectAsync();
IStompConnection connection = await stompConnector.ConnectAsync(heartbeat: new Heartbeat(30000, 30000));

// Send a couple of messages with string content.
for (int i = 1; i <= 2; i++)
Expand Down
5 changes: 4 additions & 1 deletion StompNet/IO/Stomp12FrameWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*******************************************************************************/

using System;
using System.IO;
using System.Linq;
using System.Text;
Expand Down Expand Up @@ -85,6 +84,10 @@ private async Task WriteFrameAsyncImpl(Frame frame, CancellationToken cancellati
await _writer.WriteAsync(frame.BodyArray, cancellationToken);
await _writer.WriteAsync(StompOctets.EndOfFrameByte, cancellationToken);
}
else if (frame.Command == StompCommands.Heartbeat)
{
await _writer.WriteAsync(StompOctets.LineFeedByte, cancellationToken);
}
else
{
await _writer.WriteAsync(frame.Command, cancellationToken);
Expand Down
3 changes: 3 additions & 0 deletions StompNet/IO/StompClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class StompClient : IStompClient

private readonly IStompFrameObservable _frameObservable;
private readonly IStompFrameWriter _frameWriter;
private readonly StompHeartbeatManager _heartbeatManager;
private readonly ISequenceNumberGenerator _receiptNumberGenerator;
private readonly ISequenceNumberGenerator _subscriptionNumberGenerator;
private readonly ISequenceNumberGenerator _transactionNumberGenerator;
Expand Down Expand Up @@ -71,6 +72,7 @@ internal StompClient(

_frameObservable = new StompFrameObservable(reader);
_frameWriter = new StompFrameWriterWithConfirmation(writer, _frameObservable, retryInterval);
_heartbeatManager = new StompHeartbeatManager(this, _frameObservable);

if (!useRandomNumberGenerator)
{
Expand Down Expand Up @@ -131,6 +133,7 @@ protected virtual void Dispose(bool disposing)
if(_disposed) return;

_frameWriter.Dispose();
_heartbeatManager.Dispose();
if(_cascadeDispose)
{
_reader.Dispose();
Expand Down
6 changes: 6 additions & 0 deletions StompNet/IO/StompFrameWriterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,11 @@ public static Task WriteDisconnectAsync(
Frame frame = StompFrameFactory.CreateDisconnect(receipt, extraHeaders);
return writer.WriteAsync(frame, cancelToken);
}

public static Task WriteHearbeatAsync(this IStompFrameWriter writer, CancellationToken? cancellationToken = null)
{
CancellationToken cancelToken = cancellationToken ?? CancellationToken.None;
return writer.WriteAsync(Frame.HeartbeatFrame, cancelToken);
}
}
}
125 changes: 125 additions & 0 deletions StompNet/IO/StompHeartbeatManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using StompNet.Helpers;
using StompNet.Models;
using StompNet.Models.Frames;

namespace StompNet.IO
{
/// <summary>
/// Observer to manage hearbeat
/// </summary>
public class StompHeartbeatManager : IDisposable
{

private IStompClient client;
private IStompFrameObservable frameObservable;

private CancellationTokenSource keepAliveToken;
private CancellationTokenSource sendHBToken;
private bool shouldStartSendingHeartbeat;

/// <summary>
/// Gets the last activity time (received message)
/// </summary>
public DateTime LastActivityTime { get; private set; } = DateTime.UtcNow;

/// <summary>
/// Gets or sets margin of acceptance to send and receive heartbeat
/// because of timing inaccuracies, the receiver SHOULD be tolerant and take into account an error margin
/// </summary>
public static int MarginOfAcceptance { get; set; } = 1000;

/// <summary>
/// Create an observer to <paramref name="frameObservable"/>
/// </summary>
/// <param name="client"></param>
/// <param name="frameObservable"></param>
public StompHeartbeatManager(IStompClient client, IStompFrameObservable frameObservable)
{
this.client = client;
this.frameObservable = frameObservable;

if (this.frameObservable == null)
throw new ArgumentNullException("frameObservable");

this.frameObservable.SubscribeEx(this.OnNext);
}

private void OnNext(Frame frame)
{
if (frame.Command == StompCommands.Connected)
{
var connectedFrame = StompInterpreter.Interpret(frame) as ConnectedFrame;
if (connectedFrame.Heartbeat.Incoming > 0)
{
CheckKeepAlive(connectedFrame.Heartbeat.Incoming + MarginOfAcceptance);
}

if (connectedFrame.Heartbeat.Outgoing > 0)
{
SendHeartBeat((int)(Math.Max(connectedFrame.Heartbeat.Outgoing - MarginOfAcceptance, MarginOfAcceptance)));
}
}

LastActivityTime = DateTime.UtcNow;
}

/// <summary>
/// Checks if expected server heartbeat is received
/// </summary>
/// <param name="interval"></param>
private async void CheckKeepAlive(int interval)
{
keepAliveToken = new CancellationTokenSource();
while (!keepAliveToken.IsCancellationRequested)
{
var timeToWait = interval - (DateTime.UtcNow - LastActivityTime).TotalMilliseconds;
if (timeToWait < 0)
{
client.Dispose();
}
else
{
await Task.Delay((int)timeToWait, keepAliveToken.Token).ContinueWith(t => { });
}
}
}

/// <summary>
/// Sends heartbeat periodically
/// </summary>
/// <param name="interval"></param>
private async void SendHeartBeat(int interval)
{
sendHBToken = new CancellationTokenSource();
while (!sendHBToken.IsCancellationRequested)
{
if (shouldStartSendingHeartbeat)
{
await client.WriteHearbeatAsync();
}
else
{
shouldStartSendingHeartbeat = true;
}
await Task.Delay(interval, sendHBToken.Token)
.ContinueWith(t => { }); ;
}
}

public void Dispose()
{
if (keepAliveToken != null && !keepAliveToken.IsCancellationRequested)
{
keepAliveToken.Cancel();
}

if (sendHBToken != null && !sendHBToken.IsCancellationRequested)
{
sendHBToken.Cancel();
}
}
}
}
4 changes: 3 additions & 1 deletion StompNet/IStompConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using StompNet.Models.Frames;

namespace StompNet
{
Expand All @@ -36,7 +37,8 @@ public interface IStompConnector : IDisposable
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the connect operation.</returns>
Task<IStompConnection> ConnectAsync(
IEnumerable<KeyValuePair<string, string>> extraHeaders = null,
IEnumerable<KeyValuePair<string, string>> extraHeaders = null,
Heartbeat heartbeat = null,
CancellationToken? cancellationToken = null);
}
}
3 changes: 2 additions & 1 deletion StompNet/Stomp12Connector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ public Stomp12Connector(
/// <returns>A task representing the connect operation.</returns>
public async Task<IStompConnection> ConnectAsync(
IEnumerable<KeyValuePair<string, string>> extraHeaders = null,
Heartbeat heartbeat = null,
CancellationToken? cancellationToken = null)
{
_client.Start(_cts.Token);

await _client.WriteConnectAsync(_host, _user, _password, Heartbeat.NoHeartbeat, extraHeaders, cancellationToken);
await _client.WriteConnectAsync(_host, _user, _password, heartbeat ?? Heartbeat.NoHeartbeat, extraHeaders, cancellationToken);

return new StompConnection(_client);
}
Expand Down
1 change: 1 addition & 0 deletions StompNet/StompNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<Compile Include="IO\StompFrameObservable.cs" />
<Compile Include="IO\StompFrameWriterExtensions.cs" />
<Compile Include="IO\StompFrameWriterWithConfirmation.cs" />
<Compile Include="IO\StompHeartbeatManager.cs" />
<Compile Include="IO\StompSerialFrameReader.cs" />
<Compile Include="IO\StompSerialFrameWriter.cs" />
<Compile Include="IStompConnection.cs" />
Expand Down