A STOMP-over-WebSocket client library for Rust, built on top of awc and async-stomp.
This crate provides a simple client to connect to a STOMP-enabled WebSocket server (like RabbitMQ over Web-STOMP, or ActiveMQ). It handles the WebSocket connection, STOMP frame encoding/decoding, and WebSocket heartbeat (ping/pong) for you.
- Connects to STOMP servers over WebSocket using
awc. - Handles all STOMP protocol encoding and decoding via
async-stomp. - Manages WebSocket ping/pong heartbeats automatically in a background task.
- Provides a simple
tokio::mpscchannel-based API ([WStompClient]) for sending and receiving STOMP frames. - Connection helpers for various authentication methods:
- [
connect]: Anonymous connection. - [
connect_with_pass]: Login and passcode authentication. - [
connect_with_token]: Authentication using an authorization token header.
- [
- Optional
rustlsfeature for SSL connections, with helpers that force HTTP/1.1 for compatibility with servers like SockJS.
Add this to your Cargo.toml:
[dependencies]
wstomp = "0.1.0" # Replace with the actual version
actix-rt = "2.0"For SSL support, enable the rustls feature:
[dependencies]
wstomp = { version = "0.1.0", features = ["rustls"] }Here is a basic example of connecting, subscribing to a topic, and receiving messages.
use wstomp::{
connect_with_pass,
stomp::{FromServer, Message, ToServer, client::Subscriber},
WStompEvent, WStompError,
};
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = "ws://127.0.0.1:15674/ws/websocket"; // Example: RabbitMQ Web-STOMP (Note the "/websocket" suffix)
let login = "guest".to_string();
let passcode = "guest".to_string();
// 1. Connect to the server
println!("Connecting to {}...", url);
let mut client = connect_with_pass(url, login, passcode)
.await
.expect("Failed to connect");
println!("Connected! Subscribing...");
// 2. Create a SUBSCRIBE frame
let subscribe_frame = Subscriber::builder()
.destination("queue.test")
.id("subscription-1")
.subscribe();
// 3. Send the SUBSCRIBE frame
client.send(subscribe_frame).await?;
println!("Subscribed! Waiting for messages...");
// 4. Listen for incoming messages
while let Some(event) = client.recv().await {
match event {
// Receive messages from the server
WStompEvent::Message(msg) => {
match msg.content {
FromServer::Message {
destination,
message_id,
subscription,
body,
..
} => {
println!("\n--- NEW MESSAGE ---");
println!("Destination: {}", destination);
println!("Subscription: {}", subscription);
println!("Message ID: {}", message_id);
if let Some(body_bytes) = body {
println!("Body: {}", String::from_utf8_lossy(&body_bytes));
}
}
FromServer::Receipt { receipt_id } => {
println!("Received receipt: {}", receipt_id);
}
FromServer::Connected { .. } => {
println!("Received CONNECTED frame (usually the first message)");
}
FromServer::Error { message, body, .. } => {
println!("Received ERROR frame: {}", message.unwrap_or_default());
if let Some(body_bytes) = body {
println!("Error Body: {}", String::from_utf8_lossy(&body_bytes));
}
break;
}
// IncompleteStompFrame is a warning, not a hard error
other => println!("Received other frame: {:?}", other),
}
}
// Handle errors
WStompEvent::Error(err) => {
match err {
WStompError::IncompleteStompFrame => {
// This is a warning, you can choose to ignore it or log it
eprintln!("Warning: Dropped incomplete STOMP frame.");
}
other_err => {
// These are more serious errors
eprintln!("Connection error: {}", other_err);
break;
}
}
}
}
}
Ok(())
}If you need to pass an Authorization header (or any custom header):
use wstomp::connect_with_token;
#[actix_rt::main]
async fn main() {
let url = "ws://my-server.com/ws/websocket";
let token = "my-secret-jwt-token";
let client = connect_with_token(url, token)
.await
.expect("Failed to connect");
// ... use client
}If you are connecting to a wss:// endpoint and need SSL, use the rustls feature and the connect_ssl_* helpers.
These helpers are specially configured to force HTTP/1.1, which can be necessary for compatibility with some WebSocket servers (like those using SockJS).
// Make sure to enable the "rustls" feature in Cargo.toml
use wstomp::connect_ssl_with_pass;
#[actix_rt::main]
async fn main() {
let url = "wss://secure-server.com/ws";
let login = "user".to_string();
let passcode = "pass".to_string();
let client = connect_ssl_with_pass(url, login, passcode)
.await
.expect("Failed to connect with SSL");
println!("Connected securely!");
// ... use client
}Use [WStompConfig::build_and_connect_with_reconnection_cb] method to automatically perform a full reconnect upon errors.
use wstomp::{WStompClient, WStompConfig, WStompConnectError};
#[actix_rt::main]
async fn main() {
let url = "wss://secure-server.com/ws";
let session_token = "session_token";
let cb = {
move |wstomp_client_res: Result<WStompClient, WStompConnectError>| {
async move {
// Unwrap wstomp client here or react to an error.
// Upon an error you can return from the callback to make wstomp library a re-connection attempt
}
}
};
let res = WStompConfig::new(url)
.ssl()
.auth_token(session_token)
.build_and_connect_with_reconnection_cb(cb);
// ... do different stuff here, but don't exit immediately as this will terminate wstomp loop.
}The connection functions ([connect], [connect_ssl], etc.) return a Result<WStompClient, WStompConnectError>.
Once connected, the WStompClient::rx channel produces [WStompEvent] items, it may be a message or [WStompError].
-
WStompConnectError: An error that occurs during the initial WebSocket and STOMPCONNECThandshake. -
WStompError: An error that occurs after a successful connection.WsReceive/WsSend: A WebSocket protocol error.StompDecoding/StompEncoding: A STOMP frame decoding/encoding error.IncompleteStompFrame: A warning indicating that data was received but was not enough to form a complete STOMP frame. The client has dropped this data. This is often safe to ignore or log as a warning.WebsocketClosed: WebSocket was closed, possibly a reason fromawclibrary is inside.PingFailed: Couldn't send ping through the WebSocket protocol.PingTimeout: There was no pong for last ping.
This crate is licensed under "MIT" or "Apache-2.0".