From 16eb9007363ce999c0b17211bb661788dde2ca9d Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:05:14 +0100 Subject: [PATCH 01/11] Introduce new traits for outbound and inbound message to eventually deprecate the FixMessage trait --- crates/hotfix/src/message.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index f512c5e..553db31 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -22,10 +22,34 @@ pub trait FixMessage: Clone + Send + 'static { fn write(&self, msg: &mut Message); fn message_type(&self) -> &str; + fn parse(message: &Message) -> Self; +} + +pub trait OutboundMessage: Clone + Send + 'static { + fn write(&self, msg: &mut Message); + + fn message_type(&self) -> &str; +} +impl OutboundMessage for M { + fn write(&self, msg: &mut Message) { + M::write(self, msg) + } + + fn message_type(&self) -> &str { + M::message_type(self) + } +} +pub trait InboundMessage: Clone + Send + 'static { fn parse(message: &Message) -> Self; } +impl InboundMessage for M { + fn parse(message: &Message) -> Self { + M::parse(message) + } +} + pub fn generate_message( begin_string: &str, sender_comp_id: &str, @@ -43,9 +67,3 @@ pub fn generate_message( msg.encode(&Config::default()) } - -pub trait WriteMessage { - fn write(&self, msg: &mut Message); - - fn message_type(&self) -> &str; -} From fc67b906dde43224a5499e9c5bf0e54739e0d2e4 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:35:21 +0100 Subject: [PATCH 02/11] Replace FixMessage with OutboundMessage in outbound messages channel --- crates/hotfix/src/application.rs | 4 +-- crates/hotfix/src/initiator.rs | 20 +++++------ crates/hotfix/src/message.rs | 2 +- crates/hotfix/src/session.rs | 35 ++++++++++++------- crates/hotfix/src/session/session_handle.rs | 8 ++--- crates/hotfix/src/session/session_ref.rs | 14 ++++---- crates/hotfix/src/transport/socket.rs | 10 +++--- .../src/transport/socket/socket_reader.rs | 8 ++--- .../tests/common/fakes/fake_application.rs | 2 +- examples/load-testing/src/application.rs | 2 +- examples/simple-new-order/src/application.rs | 2 +- 11 files changed, 59 insertions(+), 48 deletions(-) diff --git a/crates/hotfix/src/application.rs b/crates/hotfix/src/application.rs index e60bc7a..d160c7a 100644 --- a/crates/hotfix/src/application.rs +++ b/crates/hotfix/src/application.rs @@ -1,10 +1,10 @@ #[async_trait::async_trait] /// The application users of HotFIX can implement to hook into the engine. -pub trait Application: Send + Sync + 'static { +pub trait Application: Send + Sync + 'static { /// Called when a message is sent to the engine to be sent to the counterparty. /// /// This is invoked before the raw message is persisted in the message store. - async fn on_outbound_message(&self, msg: &M) -> OutboundDecision; + async fn on_outbound_message(&self, msg: &O) -> OutboundDecision; /// Called when a message is received from the counterparty. /// /// This is invoked after the message is verified and parsed into a typed message. diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index a7b1e76..8cfef4e 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -13,22 +13,22 @@ use tracing::{debug, warn}; use crate::application::Application; use crate::config::SessionConfig; -use crate::message::FixMessage; +use crate::message::{FixMessage, OutboundMessage}; use crate::session::{InternalSessionRef, SessionHandle}; use crate::store::MessageStore; use crate::transport::connect; #[derive(Clone)] -pub struct Initiator { +pub struct Initiator { pub config: SessionConfig, - session_handle: SessionHandle, + session_handle: SessionHandle, completion_rx: watch::Receiver, } -impl Initiator { - pub async fn start( +impl Initiator { + pub async fn start( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + Send + Sync + 'static, ) -> Self { let session_ref = InternalSessionRef::new(config.clone(), application, store); @@ -47,7 +47,7 @@ impl Initiator { } } - pub async fn send_message(&self, msg: M) -> anyhow::Result<()> { + pub async fn send_message(&self, msg: Outbound) -> anyhow::Result<()> { self.session_handle.send_message(msg).await?; Ok(()) @@ -57,7 +57,7 @@ impl Initiator { self.config.sender_comp_id == sender_comp_id && self.config.target_comp_id == target_comp_id } - pub fn session_handle(&self) -> SessionHandle { + pub fn session_handle(&self) -> SessionHandle { self.session_handle.clone() } @@ -85,9 +85,9 @@ impl Initiator { } } -async fn establish_connection( +async fn establish_connection( config: SessionConfig, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, completion_tx: watch::Sender, ) { loop { diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index 553db31..5f70185 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -55,7 +55,7 @@ pub fn generate_message( sender_comp_id: &str, target_comp_id: &str, msg_seq_num: u64, - message: impl FixMessage, + message: impl OutboundMessage, ) -> Result, EncodeError> { let mut msg = Message::new(begin_string, message.message_type()); msg.set(SENDER_COMP_ID, sender_comp_id); diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index adce28f..1131f31 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -6,11 +6,11 @@ pub mod session_ref; mod state; use crate::config::SessionConfig; -use crate::message::FixMessage; use crate::message::generate_message; use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::parser::RawFixMessage; +use crate::message::{FixMessage, OutboundMessage}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; use anyhow::{Result, anyhow}; @@ -54,7 +54,7 @@ pub use crate::session::session_handle::SessionHandle; const SCHEDULE_CHECK_INTERVAL: u64 = 1; -struct Session { +struct Session { message_config: MessageConfig, config: SessionConfig, schedule: SessionSchedule, @@ -64,11 +64,21 @@ struct Session { store: S, schedule_check_timer: Pin>, reset_on_next_logon: bool, - _phantom: std::marker::PhantomData M>, + _phantom: std::marker::PhantomData (M, O)>, } -impl, M: FixMessage, S: MessageStore> Session { - fn new(config: SessionConfig, application: A, store: S) -> Session { +impl Session +where + App: Application, + Outbound: OutboundMessage, + M: FixMessage, + Store: MessageStore, +{ + fn new( + config: SessionConfig, + application: App, + store: Store, + ) -> Session { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config); @@ -732,7 +742,7 @@ impl, M: FixMessage, S: MessageStore> Session { .reset_peer_timer(self.config.heartbeat_interval, test_request_id); } - async fn send_app_message(&mut self, message: M) { + async fn send_app_message(&mut self, message: Outbound) { match self.application.on_outbound_message(&message).await { OutboundDecision::Send => { self.send_message(message).await; @@ -747,7 +757,7 @@ impl, M: FixMessage, S: MessageStore> Session { } } - async fn send_message(&mut self, message: impl FixMessage) { + async fn send_message(&mut self, message: impl OutboundMessage) { let seq_num = self.store.next_sender_seq_number(); self.store.increment_sender_seq_number().await.unwrap(); @@ -869,7 +879,7 @@ impl, M: FixMessage, S: MessageStore> Session { } } - async fn handle_outbound_message(&mut self, message: M) { + async fn handle_outbound_message(&mut self, message: Outbound) { self.send_app_message(message).await; } @@ -964,14 +974,15 @@ impl, M: FixMessage, S: MessageStore> Session { } } -async fn run_session( - mut session: Session, +async fn run_session( + mut session: Session, mut event_receiver: mpsc::Receiver, - mut outbound_message_receiver: mpsc::Receiver, + mut outbound_message_receiver: mpsc::Receiver, mut admin_request_receiver: mpsc::Receiver, ) where - A: Application, + A: Application, M: FixMessage, + O: OutboundMessage, S: MessageStore + Send + 'static, { loop { diff --git a/crates/hotfix/src/session/session_handle.rs b/crates/hotfix/src/session/session_handle.rs index 213ab38..6dfe823 100644 --- a/crates/hotfix/src/session/session_handle.rs +++ b/crates/hotfix/src/session/session_handle.rs @@ -10,12 +10,12 @@ use tokio::sync::{mpsc, oneshot}; /// such as inbound message processing and disconnects, [`SessionHandle`] is public /// and only exposes APIs intended for consumers of the engine. #[derive(Clone, Debug)] -pub struct SessionHandle { - outbound_message_sender: mpsc::Sender, +pub struct SessionHandle { + outbound_message_sender: mpsc::Sender, admin_request_sender: mpsc::Sender, } -impl SessionHandle { +impl SessionHandle { pub async fn get_session_info(&self) -> anyhow::Result { let (sender, receiver) = oneshot::channel::(); self.admin_request_sender @@ -24,7 +24,7 @@ impl SessionHandle { Ok(receiver.await?) } - pub async fn send_message(&self, msg: M) -> anyhow::Result<()> { + pub async fn send_message(&self, msg: Outbound) -> anyhow::Result<()> { self.outbound_message_sender .send(msg) .await diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index bc16443..0360bc4 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -1,5 +1,5 @@ use crate::config::SessionConfig; -use crate::message::{FixMessage, RawFixMessage}; +use crate::message::{FixMessage, OutboundMessage, RawFixMessage}; use crate::session::Session; use crate::session::admin_request::AdminRequest; use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent}; @@ -10,20 +10,20 @@ use tokio::sync::{mpsc, oneshot}; use tracing::debug; #[derive(Clone)] -pub struct InternalSessionRef { +pub struct InternalSessionRef { pub(crate) event_sender: mpsc::Sender, - pub(crate) outbound_message_sender: mpsc::Sender, + pub(crate) outbound_message_sender: mpsc::Sender, pub(crate) admin_request_sender: mpsc::Sender, } -impl InternalSessionRef { - pub fn new( +impl InternalSessionRef { + pub fn new( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + Send + Sync + 'static, ) -> Self { let (event_sender, event_receiver) = mpsc::channel::(100); - let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::(10); + let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::(10); let (admin_request_sender, admin_request_receiver) = mpsc::channel::(10); let session = Session::new(config, application, store); tokio::spawn(session::run_session( diff --git a/crates/hotfix/src/transport/socket.rs b/crates/hotfix/src/transport/socket.rs index d1c1750..4bcf4b8 100644 --- a/crates/hotfix/src/transport/socket.rs +++ b/crates/hotfix/src/transport/socket.rs @@ -6,10 +6,10 @@ pub mod tls; use std::io; use tokio::io::{AsyncRead, AsyncWrite}; +use crate::message::OutboundMessage; use crate::session::InternalSessionRef; use crate::{ config::SessionConfig, - message::FixMessage, transport::{ FixConnection, socket_reader::spawn_socket_reader, socket_writer::spawn_socket_writer, tcp::create_tcp_connection, tls::create_tcp_over_tls_connection, @@ -19,7 +19,7 @@ use crate::{ /// Connect over TCP/TLS and return a FixConnection pub async fn connect( config: &SessionConfig, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, ) -> io::Result { let use_tls = config.tls_config.is_some(); @@ -34,12 +34,12 @@ pub async fn connect( Ok(conn) } -async fn _create_io_refs( - session_ref: InternalSessionRef, +async fn _create_io_refs( + session_ref: InternalSessionRef, stream: Stream, ) -> FixConnection where - M: FixMessage, + Outbound: OutboundMessage, Stream: AsyncRead + AsyncWrite + Send + 'static, { let (reader, writer) = tokio::io::split(stream); diff --git a/crates/hotfix/src/transport/socket/socket_reader.rs b/crates/hotfix/src/transport/socket/socket_reader.rs index 407f102..484cdbb 100644 --- a/crates/hotfix/src/transport/socket/socket_reader.rs +++ b/crates/hotfix/src/transport/socket/socket_reader.rs @@ -2,14 +2,14 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadHalf}; use tokio::sync::oneshot; use tracing::debug; -use crate::message::FixMessage; +use crate::message::OutboundMessage; use crate::message::parser::Parser; use crate::session::InternalSessionRef; use crate::transport::reader::ReaderRef; pub fn spawn_socket_reader( reader: ReadHalf, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, ) -> ReaderRef { let (dc_sender, dc_receiver) = oneshot::channel(); let actor = ReaderActor::new(reader, session_ref, dc_sender); @@ -38,9 +38,9 @@ impl ReaderActor { } } -async fn run_reader(mut actor: ReaderActor) +async fn run_reader(mut actor: ReaderActor) where - M: FixMessage, + Outbound: OutboundMessage, R: AsyncRead, { let mut parser = Parser::default(); diff --git a/crates/hotfix/tests/common/fakes/fake_application.rs b/crates/hotfix/tests/common/fakes/fake_application.rs index 68ab7bb..06c0218 100644 --- a/crates/hotfix/tests/common/fakes/fake_application.rs +++ b/crates/hotfix/tests/common/fakes/fake_application.rs @@ -13,7 +13,7 @@ impl FakeApplication { } #[async_trait::async_trait] -impl Application for FakeApplication { +impl Application for FakeApplication { async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision { OutboundDecision::Send } diff --git a/examples/load-testing/src/application.rs b/examples/load-testing/src/application.rs index 330d9d3..831ad2d 100644 --- a/examples/load-testing/src/application.rs +++ b/examples/load-testing/src/application.rs @@ -15,7 +15,7 @@ impl LoadTestingApplication { } #[async_trait::async_trait] -impl Application for LoadTestingApplication { +impl Application for LoadTestingApplication { async fn on_outbound_message(&self, _msg: &Message) -> OutboundDecision { OutboundDecision::Send } diff --git a/examples/simple-new-order/src/application.rs b/examples/simple-new-order/src/application.rs index 3bbf24c..e9d5dea 100644 --- a/examples/simple-new-order/src/application.rs +++ b/examples/simple-new-order/src/application.rs @@ -7,7 +7,7 @@ use tracing::info; pub struct TestApplication {} #[async_trait::async_trait] -impl Application for TestApplication { +impl Application for TestApplication { async fn on_outbound_message(&self, _msg: &Message) -> OutboundDecision { OutboundDecision::Send } From d421c2dad5edd3ba760f67badbc073097d42880f Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:41:41 +0100 Subject: [PATCH 03/11] Constrain inbound messages to InboundMessage rather than FixMessage in the application --- crates/hotfix/src/application.rs | 6 ++--- crates/hotfix/src/initiator.rs | 6 ++--- crates/hotfix/src/session.rs | 32 ++++++++++++------------ crates/hotfix/src/session/session_ref.rs | 6 ++--- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/crates/hotfix/src/application.rs b/crates/hotfix/src/application.rs index d160c7a..40f2f3d 100644 --- a/crates/hotfix/src/application.rs +++ b/crates/hotfix/src/application.rs @@ -1,14 +1,14 @@ #[async_trait::async_trait] /// The application users of HotFIX can implement to hook into the engine. -pub trait Application: Send + Sync + 'static { +pub trait Application: Send + Sync + 'static { /// Called when a message is sent to the engine to be sent to the counterparty. /// /// This is invoked before the raw message is persisted in the message store. - async fn on_outbound_message(&self, msg: &O) -> OutboundDecision; + async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision; /// Called when a message is received from the counterparty. /// /// This is invoked after the message is verified and parsed into a typed message. - async fn on_inbound_message(&self, msg: M) -> InboundDecision; + async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision; /// Called when the session is logged out. async fn on_logout(&mut self, reason: &str); /// Called when the session is logged on. diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index 8cfef4e..a9360e7 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -13,7 +13,7 @@ use tracing::{debug, warn}; use crate::application::Application; use crate::config::SessionConfig; -use crate::message::{FixMessage, OutboundMessage}; +use crate::message::{InboundMessage, OutboundMessage}; use crate::session::{InternalSessionRef, SessionHandle}; use crate::store::MessageStore; use crate::transport::connect; @@ -26,9 +26,9 @@ pub struct Initiator { } impl Initiator { - pub async fn start( + pub async fn start( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + Send + Sync + 'static, ) -> Self { let session_ref = InternalSessionRef::new(config.clone(), application, store); diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 1131f31..2b6f77a 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -6,11 +6,11 @@ pub mod session_ref; mod state; use crate::config::SessionConfig; -use crate::message::generate_message; +use crate::message::OutboundMessage; use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::parser::RawFixMessage; -use crate::message::{FixMessage, OutboundMessage}; +use crate::message::{InboundMessage, generate_message}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; use anyhow::{Result, anyhow}; @@ -54,7 +54,7 @@ pub use crate::session::session_handle::SessionHandle; const SCHEDULE_CHECK_INTERVAL: u64 = 1; -struct Session { +struct Session { message_config: MessageConfig, config: SessionConfig, schedule: SessionSchedule, @@ -64,21 +64,21 @@ struct Session { store: S, schedule_check_timer: Pin>, reset_on_next_logon: bool, - _phantom: std::marker::PhantomData (M, O)>, + _phantom: std::marker::PhantomData (I, O)>, } -impl Session +impl Session where - App: Application, + App: Application, + Inbound: InboundMessage, Outbound: OutboundMessage, - M: FixMessage, Store: MessageStore, { fn new( config: SessionConfig, application: App, store: Store, - ) -> Session { + ) -> Session { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config); @@ -229,7 +229,7 @@ where async fn process_app_message(&mut self, message: &Message) -> Result<()> { match self.verify_message(message, true) { Ok(_) => { - let parsed_message = M::parse(message); + let parsed_message = Inbound::parse(message); if matches!( self.application.on_inbound_message(parsed_message).await, InboundDecision::TerminateSession @@ -974,16 +974,16 @@ where } } -async fn run_session( - mut session: Session, +async fn run_session( + mut session: Session, mut event_receiver: mpsc::Receiver, - mut outbound_message_receiver: mpsc::Receiver, + mut outbound_message_receiver: mpsc::Receiver, mut admin_request_receiver: mpsc::Receiver, ) where - A: Application, - M: FixMessage, - O: OutboundMessage, - S: MessageStore + Send + 'static, + App: Application, + Inbound: InboundMessage, + Outbound: OutboundMessage, + Store: MessageStore + Send + 'static, { loop { select! { diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 0360bc4..8a449d8 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -1,5 +1,5 @@ use crate::config::SessionConfig; -use crate::message::{FixMessage, OutboundMessage, RawFixMessage}; +use crate::message::{InboundMessage, OutboundMessage, RawFixMessage}; use crate::session::Session; use crate::session::admin_request::AdminRequest; use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent}; @@ -17,9 +17,9 @@ pub struct InternalSessionRef { } impl InternalSessionRef { - pub fn new( + pub fn new( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + Send + Sync + 'static, ) -> Self { let (event_sender, event_receiver) = mpsc::channel::(100); From f2bdb5368f75d0d9842cd2dc74eecbe0a0afaecd Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:45:36 +0100 Subject: [PATCH 04/11] Remove references to FixMessage trait in admin messages --- crates/hotfix/src/message/heartbeat.rs | 6 ++++-- crates/hotfix/src/message/logon.rs | 8 ++------ crates/hotfix/src/message/logout.rs | 8 ++------ crates/hotfix/src/message/reject.rs | 6 ++++-- crates/hotfix/src/message/resend_request.rs | 8 ++------ crates/hotfix/src/message/sequence_reset.rs | 8 ++------ crates/hotfix/src/message/test_request.rs | 8 ++------ 7 files changed, 18 insertions(+), 34 deletions(-) diff --git a/crates/hotfix/src/message/heartbeat.rs b/crates/hotfix/src/message/heartbeat.rs index 7c1ec22..9d4c639 100644 --- a/crates/hotfix/src/message/heartbeat.rs +++ b/crates/hotfix/src/message/heartbeat.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::{InboundMessage, OutboundMessage}; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEST_REQ_ID; @@ -16,7 +16,7 @@ impl Heartbeat { } } -impl FixMessage for Heartbeat { +impl OutboundMessage for Heartbeat { fn write(&self, msg: &mut Message) { if let Some(req_id) = &self.test_req_id { msg.set(TEST_REQ_ID, req_id.as_str()); @@ -26,7 +26,9 @@ impl FixMessage for Heartbeat { fn message_type(&self) -> &str { "0" } +} +impl InboundMessage for Heartbeat { fn parse(_message: &Message) -> Self { // TODO: this needs to be implemented properly when we're implementing Test Requests Heartbeat { test_req_id: None } diff --git a/crates/hotfix/src/message/logon.rs b/crates/hotfix/src/message/logon.rs index 8f34609..8571618 100644 --- a/crates/hotfix/src/message/logon.rs +++ b/crates/hotfix/src/message/logon.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::message::Message; use hotfix_message::session_fields::{ ENCRYPT_METHOD, HEART_BT_INT, NEXT_EXPECTED_MSG_SEQ_NUM, RESET_SEQ_NUM_FLAG, @@ -33,7 +33,7 @@ impl Logon { } } -impl FixMessage for Logon { +impl OutboundMessage for Logon { fn write(&self, msg: &mut Message) { msg.set(ENCRYPT_METHOD, self.encrypt_method); msg.set(HEART_BT_INT, self.heartbeat_interval); @@ -47,10 +47,6 @@ impl FixMessage for Logon { fn message_type(&self) -> &str { "A" } - - fn parse(_message: &Message) -> Self { - todo!() - } } #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FieldType)] diff --git a/crates/hotfix/src/message/logout.rs b/crates/hotfix/src/message/logout.rs index 4b4d55f..12141af 100644 --- a/crates/hotfix/src/message/logout.rs +++ b/crates/hotfix/src/message/logout.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEXT; @@ -14,7 +14,7 @@ impl Logout { } } -impl FixMessage for Logout { +impl OutboundMessage for Logout { fn write(&self, msg: &mut Message) { if let Some(value) = &self.text { msg.set(TEXT, value.as_str()); @@ -24,8 +24,4 @@ impl FixMessage for Logout { fn message_type(&self) -> &str { "5" } - - fn parse(_message: &Message) -> Self { - unimplemented!() - } } diff --git a/crates/hotfix/src/message/reject.rs b/crates/hotfix/src/message/reject.rs index 7eec3d6..ddf99df 100644 --- a/crates/hotfix/src/message/reject.rs +++ b/crates/hotfix/src/message/reject.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::{InboundMessage, OutboundMessage}; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::{ @@ -52,7 +52,7 @@ impl Reject { } } -impl FixMessage for Reject { +impl OutboundMessage for Reject { fn write(&self, msg: &mut Message) { msg.set(REF_SEQ_NUM, self.ref_seq_num); @@ -73,7 +73,9 @@ impl FixMessage for Reject { fn message_type(&self) -> &str { "3" } +} +impl InboundMessage for Reject { fn parse(message: &Message) -> Self { Self { ref_seq_num: message.get(REF_SEQ_NUM).unwrap(), diff --git a/crates/hotfix/src/message/resend_request.rs b/crates/hotfix/src/message/resend_request.rs index 3eddc96..1aca0ce 100644 --- a/crates/hotfix/src/message/resend_request.rs +++ b/crates/hotfix/src/message/resend_request.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::{BEGIN_SEQ_NO, END_SEQ_NO}; @@ -18,7 +18,7 @@ impl ResendRequest { } } -impl FixMessage for ResendRequest { +impl OutboundMessage for ResendRequest { fn write(&self, msg: &mut Message) { msg.set(BEGIN_SEQ_NO, self.begin_seq_no); msg.set(END_SEQ_NO, self.end_seq_no); @@ -27,8 +27,4 @@ impl FixMessage for ResendRequest { fn message_type(&self) -> &str { "2" } - - fn parse(_message: &Message) -> Self { - todo!() - } } diff --git a/crates/hotfix/src/message/sequence_reset.rs b/crates/hotfix/src/message/sequence_reset.rs index 2f4a5d4..152d9ed 100644 --- a/crates/hotfix/src/message/sequence_reset.rs +++ b/crates/hotfix/src/message/sequence_reset.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::field_types::Timestamp; use hotfix_message::message::Message; @@ -12,7 +12,7 @@ pub struct SequenceReset { pub new_seq_no: u64, } -impl FixMessage for SequenceReset { +impl OutboundMessage for SequenceReset { fn write(&self, msg: &mut Message) { msg.set(GAP_FILL_FLAG, self.gap_fill); msg.set(NEW_SEQ_NO, self.new_seq_no); @@ -24,8 +24,4 @@ impl FixMessage for SequenceReset { fn message_type(&self) -> &str { "4" } - - fn parse(_message: &Message) -> Self { - todo!() - } } diff --git a/crates/hotfix/src/message/test_request.rs b/crates/hotfix/src/message/test_request.rs index afb2381..0dac034 100644 --- a/crates/hotfix/src/message/test_request.rs +++ b/crates/hotfix/src/message/test_request.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEST_REQ_ID; @@ -14,7 +14,7 @@ impl TestRequest { } } -impl FixMessage for TestRequest { +impl OutboundMessage for TestRequest { fn write(&self, msg: &mut Message) { msg.set(TEST_REQ_ID, self.test_req_id.as_str()); } @@ -22,8 +22,4 @@ impl FixMessage for TestRequest { fn message_type(&self) -> &str { "1" } - - fn parse(_message: &Message) -> Self { - unimplemented!() - } } From 24b38828a1ec8ebfd371419ed4295c693d70276e Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:50:27 +0100 Subject: [PATCH 05/11] Replace references to FixMessage in tests --- crates/hotfix/src/message.rs | 1 + .../src/transport/socket/socket_reader.rs | 8 ++------ crates/hotfix/tests/common/actions.rs | 6 +++--- .../tests/common/fakes/fake_counterparty.rs | 19 +++++++++++-------- crates/hotfix/tests/common/test_messages.rs | 13 +++++-------- .../session_test_cases/business_tests.rs | 2 +- .../tests/session_test_cases/resend_tests.rs | 2 +- 7 files changed, 24 insertions(+), 27 deletions(-) diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index 5f70185..b1be004 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -22,6 +22,7 @@ pub trait FixMessage: Clone + Send + 'static { fn write(&self, msg: &mut Message); fn message_type(&self) -> &str; + fn parse(message: &Message) -> Self; } diff --git a/crates/hotfix/src/transport/socket/socket_reader.rs b/crates/hotfix/src/transport/socket/socket_reader.rs index 484cdbb..cc28b10 100644 --- a/crates/hotfix/src/transport/socket/socket_reader.rs +++ b/crates/hotfix/src/transport/socket/socket_reader.rs @@ -78,7 +78,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::message::{FixMessage, Message}; + use crate::message::Message; use crate::session::admin_request::AdminRequest; use crate::session::event::SessionEvent; use tokio::io::{AsyncWriteExt, duplex}; @@ -87,16 +87,12 @@ mod tests { #[derive(Clone, Debug, PartialEq)] struct TestMessage; - impl FixMessage for TestMessage { + impl OutboundMessage for TestMessage { fn write(&self, _msg: &mut Message) {} fn message_type(&self) -> &str { "TEST" } - - fn parse(_message: &Message) -> Self { - TestMessage - } } /// Creates a test InternalSessionRef that captures events for verification diff --git a/crates/hotfix/tests/common/actions.rs b/crates/hotfix/tests/common/actions.rs index 1c3cf73..a862f0b 100644 --- a/crates/hotfix/tests/common/actions.rs +++ b/crates/hotfix/tests/common/actions.rs @@ -1,6 +1,6 @@ use crate::common::fakes::{FakeCounterparty, SessionSpy}; use crate::common::test_messages::TestMessage; -use hotfix::message::FixMessage; +use hotfix::message::OutboundMessage; use std::time::Duration; pub struct When { @@ -26,7 +26,7 @@ impl When<&SessionSpy> { } impl When<&mut FakeCounterparty> { - pub async fn has_previously_sent(&mut self, message: impl FixMessage) { + pub async fn has_previously_sent(&mut self, message: impl OutboundMessage) { self.target.push_previously_sent_message(message).await; } @@ -38,7 +38,7 @@ impl When<&mut FakeCounterparty> { self.target.resend_message(sequence_number, true).await; } - pub async fn sends_message(&mut self, message: impl FixMessage) { + pub async fn sends_message(&mut self, message: impl OutboundMessage) { self.target.send_message(message).await; } diff --git a/crates/hotfix/tests/common/fakes/fake_counterparty.rs b/crates/hotfix/tests/common/fakes/fake_counterparty.rs index 4621592..33cb643 100644 --- a/crates/hotfix/tests/common/fakes/fake_counterparty.rs +++ b/crates/hotfix/tests/common/fakes/fake_counterparty.rs @@ -1,7 +1,7 @@ use hotfix::config::SessionConfig; use hotfix::message::logon::{Logon, ResetSeqNumConfig}; use hotfix::message::sequence_reset::SequenceReset; -use hotfix::message::{FixMessage, RawFixMessage, generate_message}; +use hotfix::message::{OutboundMessage, RawFixMessage, generate_message}; use hotfix::session::InternalSessionRef; use hotfix::transport::FixConnection; use hotfix::transport::reader::ReaderRef; @@ -14,11 +14,11 @@ use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; -pub struct FakeCounterparty { +pub struct FakeCounterparty { receiver: Receiver, received_messages: Vec, sent_messages: Vec>, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, session_config: SessionConfig, message_builder: MessageBuilder, message_config: MessageConfig, @@ -26,11 +26,14 @@ pub struct FakeCounterparty { _dc_sender: oneshot::Sender<()>, } -impl FakeCounterparty +impl FakeCounterparty where - M: FixMessage, + Outbound: OutboundMessage, { - pub async fn start(session_ref: InternalSessionRef, session_config: SessionConfig) -> Self { + pub async fn start( + session_ref: InternalSessionRef, + session_config: SessionConfig, + ) -> Self { let (writer_ref, receiver) = Self::create_writer(); let (reader_ref, dc_sender) = Self::create_reader(); let connection = FixConnection::new(writer_ref, reader_ref); @@ -68,7 +71,7 @@ where } } - pub async fn push_previously_sent_message(&mut self, message: impl FixMessage) { + pub async fn push_previously_sent_message(&mut self, message: impl OutboundMessage) { let raw_message = generate_message( &self.session_config.begin_string, &self.session_config.sender_comp_id, @@ -148,7 +151,7 @@ where self.send_message(logon).await; } - pub async fn send_message(&mut self, message: impl FixMessage) { + pub async fn send_message(&mut self, message: impl OutboundMessage) { let raw_message = generate_message( &self.session_config.begin_string, &self.session_config.sender_comp_id, diff --git a/crates/hotfix/tests/common/test_messages.rs b/crates/hotfix/tests/common/test_messages.rs index b90a917..f17e605 100644 --- a/crates/hotfix/tests/common/test_messages.rs +++ b/crates/hotfix/tests/common/test_messages.rs @@ -1,7 +1,7 @@ use crate::common::setup::{COUNTERPARTY_COMP_ID, OUR_COMP_ID}; use chrono::TimeDelta; use hotfix::Message as HotfixMessage; -use hotfix::message::{FixMessage, generate_message}; +use hotfix::message::{InboundMessage, OutboundMessage, generate_message}; use hotfix_message::dict::{FieldLocation, FixDatatype}; use hotfix_message::field_types::Timestamp; use hotfix_message::message::{Config, Message}; @@ -63,7 +63,7 @@ impl TestMessage { } } -impl FixMessage for TestMessage { +impl OutboundMessage for TestMessage { fn write(&self, msg: &mut HotfixMessage) { match self { TestMessage::ExecutionReport { @@ -109,7 +109,9 @@ impl FixMessage for TestMessage { TestMessage::NewOrderSingle { .. } => "D", } } +} +impl InboundMessage for TestMessage { fn parse(msg: &HotfixMessage) -> Self { let msg_type: &str = msg.header().get(fix44::MSG_TYPE).unwrap(); match msg_type { @@ -188,7 +190,7 @@ impl Default for ExecutionReportWithInvalidField { } } -impl FixMessage for ExecutionReportWithInvalidField { +impl OutboundMessage for ExecutionReportWithInvalidField { fn write(&self, msg: &mut Message) { msg.set(fix44::ORDER_ID, self.order_id.as_str()); msg.set(fix44::EXEC_ID, self.exec_id.as_str()); @@ -206,11 +208,6 @@ impl FixMessage for ExecutionReportWithInvalidField { fn message_type(&self) -> &str { "D" } - - fn parse(_message: &Message) -> Self { - // we never parse this message - unimplemented!() - } } pub const CUSTOM_FIELD: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition { diff --git a/crates/hotfix/tests/session_test_cases/business_tests.rs b/crates/hotfix/tests/session_test_cases/business_tests.rs index d4f785b..f480199 100644 --- a/crates/hotfix/tests/session_test_cases/business_tests.rs +++ b/crates/hotfix/tests/session_test_cases/business_tests.rs @@ -3,7 +3,7 @@ use crate::common::assertions::then; use crate::common::cleanup::finally; use crate::common::setup::given_an_active_session; use crate::common::test_messages::TestMessage; -use hotfix::message::FixMessage; +use hotfix::message::{InboundMessage, OutboundMessage}; use hotfix_message::{FieldType, fix44::MsgType}; #[tokio::test] diff --git a/crates/hotfix/tests/session_test_cases/resend_tests.rs b/crates/hotfix/tests/session_test_cases/resend_tests.rs index 865652c..82278cf 100644 --- a/crates/hotfix/tests/session_test_cases/resend_tests.rs +++ b/crates/hotfix/tests/session_test_cases/resend_tests.rs @@ -5,7 +5,7 @@ use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session}; use crate::common::test_messages::{ TestMessage, build_execution_report_with_incorrect_body_length, build_invalid_resend_request, }; -use hotfix::message::{FixMessage, ResendRequest}; +use hotfix::message::{OutboundMessage, ResendRequest}; use hotfix::session::Status; use hotfix_message::fix44::{GAP_FILL_FLAG, MsgType, NEW_SEQ_NO}; use hotfix_message::{FieldType, Part}; From 14630fa53592bd317ba1df34a5cbd5e6915b5d3e Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:57:44 +0100 Subject: [PATCH 06/11] Replace references to FixMessage in session controller --- crates/hotfix-web/src/session_controller.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/hotfix-web/src/session_controller.rs b/crates/hotfix-web/src/session_controller.rs index 32b1dd0..c2d99f2 100644 --- a/crates/hotfix-web/src/session_controller.rs +++ b/crates/hotfix-web/src/session_controller.rs @@ -1,4 +1,4 @@ -use hotfix::message::FixMessage; +use hotfix::message::OutboundMessage; use hotfix::session::{SessionHandle, SessionInfo}; /// Controller for session operations, providing both read access and administrative actions @@ -11,12 +11,12 @@ pub trait SessionController: Clone + Send + Sync { /// HTTP session controller implementation that wraps a SessionHandle #[derive(Clone)] -pub struct HttpSessionController { - pub(crate) session_handle: SessionHandle, +pub struct HttpSessionController { + pub(crate) session_handle: SessionHandle, } #[async_trait::async_trait] -impl SessionController for HttpSessionController { +impl SessionController for HttpSessionController { async fn get_session_info(&self) -> anyhow::Result { self.session_handle.get_session_info().await } @@ -34,7 +34,9 @@ impl SessionController for HttpSessionController { // Note: We can't use a blanket impl due to Rust's orphan rules (can't impl foreign trait for generic type) #[cfg(feature = "ui")] #[async_trait::async_trait] -impl hotfix_web_ui::SessionInfoProvider for HttpSessionController { +impl hotfix_web_ui::SessionInfoProvider + for HttpSessionController +{ async fn get_session_info(&self) -> anyhow::Result { // Reuse the SessionController implementation SessionController::get_session_info(self).await @@ -43,12 +45,12 @@ impl hotfix_web_ui::SessionInfoProvider for HttpSessionController // Allow extracting HttpSessionController from AppState for hotfix-web-ui #[cfg(feature = "ui")] -impl axum::extract::FromRef>> - for HttpSessionController +impl axum::extract::FromRef>> + for HttpSessionController where - M: FixMessage, + Outbound: OutboundMessage, { - fn from_ref(state: &crate::AppState>) -> Self { + fn from_ref(state: &crate::AppState>) -> Self { state.controller.clone() } } From 051e6ec00e9a8dd0d1dfcaf0b36f28b4323a039a Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 16:59:12 +0100 Subject: [PATCH 07/11] Replace references to FixMessage in web wrapper --- crates/hotfix-web/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/hotfix-web/src/lib.rs b/crates/hotfix-web/src/lib.rs index a0123ed..4d5d62b 100644 --- a/crates/hotfix-web/src/lib.rs +++ b/crates/hotfix-web/src/lib.rs @@ -5,7 +5,7 @@ mod session_controller; use crate::endpoints::build_api_router; use crate::session_controller::{HttpSessionController, SessionController}; use axum::Router; -use hotfix::message::FixMessage; +use hotfix::message::OutboundMessage; use hotfix::session::SessionHandle; #[derive(Clone)] @@ -21,13 +21,13 @@ pub struct RouterConfig { } /// Build a router with default configuration (admin endpoints disabled) -pub fn build_router(session_handle: SessionHandle) -> Router { +pub fn build_router(session_handle: SessionHandle) -> Router { build_router_with_config(session_handle, RouterConfig::default()) } /// Build a router with custom configuration -pub fn build_router_with_config( - session_handle: SessionHandle, +pub fn build_router_with_config( + session_handle: SessionHandle, config: RouterConfig, ) -> Router { let controller = HttpSessionController { session_handle }; From a4347b6a14b7248498c0057e206271d553dbfc43 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 17:01:03 +0100 Subject: [PATCH 08/11] Replace uses of FixMessage in example apps --- examples/load-testing/src/messages.rs | 6 ++++-- examples/simple-new-order/src/messages.rs | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/load-testing/src/messages.rs b/examples/load-testing/src/messages.rs index d3f2feb..d9b9245 100644 --- a/examples/load-testing/src/messages.rs +++ b/examples/load-testing/src/messages.rs @@ -2,7 +2,7 @@ use hotfix::Message as HotfixMessage; use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; use hotfix::fix44::{OrdStatus, OrdType, Side}; -use hotfix::message::{FixMessage, Part, RepeatingGroup}; +use hotfix::message::{InboundMessage, OutboundMessage, Part, RepeatingGroup}; #[derive(Debug, Clone)] #[allow(dead_code)] @@ -62,7 +62,7 @@ impl Message { } } -impl FixMessage for Message { +impl OutboundMessage for Message { fn write(&self, msg: &mut HotfixMessage) { match self { Self::NewOrderSingle(order) => { @@ -93,7 +93,9 @@ impl FixMessage for Message { _ => unimplemented!(), } } +} +impl InboundMessage for Message { fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); if message_type == "8" { diff --git a/examples/simple-new-order/src/messages.rs b/examples/simple-new-order/src/messages.rs index 3fa899d..ded1cc5 100644 --- a/examples/simple-new-order/src/messages.rs +++ b/examples/simple-new-order/src/messages.rs @@ -1,7 +1,7 @@ use hotfix::Message as HotfixMessage; use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; -use hotfix::message::{FixMessage, Part, RepeatingGroup}; +use hotfix::message::{InboundMessage, OutboundMessage, Part, RepeatingGroup}; #[derive(Debug, Clone)] pub struct NewOrderSingle { @@ -26,7 +26,7 @@ pub enum Message { UnimplementedMessage(Vec), } -impl FixMessage for Message { +impl OutboundMessage for Message { fn write(&self, msg: &mut HotfixMessage) { match self { Self::NewOrderSingle(order) => { @@ -56,7 +56,9 @@ impl FixMessage for Message { _ => unimplemented!(), } } +} +impl InboundMessage for Message { fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); Self::UnimplementedMessage(message_type.as_bytes().to_vec()) From fea9b677cc9d5ae215c618a4b501dbb1b505ec87 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 17:01:48 +0100 Subject: [PATCH 09/11] Completely deprecate and remove FixMessage trait --- crates/hotfix/src/message.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index b1be004..8395f17 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -18,39 +18,16 @@ pub mod verification; pub use parser::RawFixMessage; pub use resend_request::ResendRequest; -pub trait FixMessage: Clone + Send + 'static { - fn write(&self, msg: &mut Message); - - fn message_type(&self) -> &str; - - fn parse(message: &Message) -> Self; -} - pub trait OutboundMessage: Clone + Send + 'static { fn write(&self, msg: &mut Message); fn message_type(&self) -> &str; } -impl OutboundMessage for M { - fn write(&self, msg: &mut Message) { - M::write(self, msg) - } - - fn message_type(&self) -> &str { - M::message_type(self) - } -} pub trait InboundMessage: Clone + Send + 'static { fn parse(message: &Message) -> Self; } -impl InboundMessage for M { - fn parse(message: &Message) -> Self { - M::parse(message) - } -} - pub fn generate_message( begin_string: &str, sender_comp_id: &str, From f43625d9cd74eb0983be71e30ddc259e6e6f08fd Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 17:21:13 +0100 Subject: [PATCH 10/11] Split inbound and outbound messages in load testing app --- examples/load-testing/src/application.rs | 15 ++++++--------- examples/load-testing/src/main.rs | 15 +++++++-------- examples/load-testing/src/messages.rs | 20 +++++++++++--------- 3 files changed, 24 insertions(+), 26 deletions(-) diff --git a/examples/load-testing/src/application.rs b/examples/load-testing/src/application.rs index 831ad2d..981c42e 100644 --- a/examples/load-testing/src/application.rs +++ b/examples/load-testing/src/application.rs @@ -1,4 +1,4 @@ -use crate::messages::{ExecutionReport, Message}; +use crate::messages::{ExecutionReport, InboundMsg, OutboundMsg}; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; use tokio::sync::mpsc::UnboundedSender; @@ -15,17 +15,14 @@ impl LoadTestingApplication { } #[async_trait::async_trait] -impl Application for LoadTestingApplication { - async fn on_outbound_message(&self, _msg: &Message) -> OutboundDecision { +impl Application for LoadTestingApplication { + async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, msg: Message) -> InboundDecision { + async fn on_inbound_message(&self, msg: InboundMsg) -> InboundDecision { match msg { - Message::NewOrderSingle(_) => { - unimplemented!("we should not receive orders"); - } - Message::Unimplemented(data) => { + InboundMsg::Unimplemented(data) => { let pretty_bytes: Vec = data .iter() .map(|b| if *b == b'\x01' { b'|' } else { *b }) @@ -33,7 +30,7 @@ impl Application for LoadTestingApplication { let s = std::str::from_utf8(&pretty_bytes).unwrap_or("invalid characters"); info!("received message: {:?}", s); } - Message::ExecutionReport(report) => { + InboundMsg::ExecutionReport(report) => { if self.sender.send(report).is_err() { return InboundDecision::TerminateSession; } diff --git a/examples/load-testing/src/main.rs b/examples/load-testing/src/main.rs index 12c8b5f..52001de 100644 --- a/examples/load-testing/src/main.rs +++ b/examples/load-testing/src/main.rs @@ -2,7 +2,7 @@ mod application; mod messages; use crate::application::LoadTestingApplication; -use crate::messages::{ExecutionReport, Message, NewOrderSingle}; +use crate::messages::{ExecutionReport, NewOrderSingle, OutboundMsg}; use clap::{Parser, ValueEnum}; use hotfix::config::SessionConfig; use hotfix::field_types::{Date, Timestamp}; @@ -91,7 +91,7 @@ async fn start_session( session_config: SessionConfig, db_config: Database, app: LoadTestingApplication, -) -> Initiator { +) -> Initiator { match db_config { Database::Memory => { let store = hotfix::store::in_memory::InMemoryMessageStore::default(); @@ -110,16 +110,16 @@ async fn start_session( } } -async fn submit_messages(session_handle: SessionHandle, message_count: u32) { +async fn submit_messages(session_handle: SessionHandle, message_count: u32) { for _ in 0..message_count { submit_message(&session_handle).await; } } -async fn submit_message(session_handle: &SessionHandle) { +async fn submit_message(session_handle: &SessionHandle) { let mut order_id = format!("{}", uuid::Uuid::new_v4()); order_id.truncate(12); - let order = NewOrderSingle { + let order = OutboundMsg::NewOrderSingle(NewOrderSingle { transact_time: Timestamp::utc_now(), symbol: "EUR/USD".to_string(), cl_ord_id: order_id, @@ -131,11 +131,10 @@ async fn submit_message(session_handle: &SessionHandle) { number_of_allocations: 1, allocation_account: "acc1".to_string(), allocation_quantity: 230, - }; - let msg = Message::NewOrderSingle(order); + }); session_handle - .send_message(msg) + .send_message(order) .await .expect("session to accept message"); } diff --git a/examples/load-testing/src/messages.rs b/examples/load-testing/src/messages.rs index d9b9245..02ad607 100644 --- a/examples/load-testing/src/messages.rs +++ b/examples/load-testing/src/messages.rs @@ -38,13 +38,17 @@ pub struct NewOrderSingle { } #[derive(Debug, Clone)] -pub enum Message { +pub enum InboundMsg { ExecutionReport(ExecutionReport), - NewOrderSingle(NewOrderSingle), Unimplemented(Vec), } -impl Message { +#[derive(Debug, Clone)] +pub enum OutboundMsg { + NewOrderSingle(NewOrderSingle), +} + +impl InboundMsg { fn parse_execution_report_ack(message: &HotfixMessage) -> Self { let report = ExecutionReport { order_id: message.get::<&str>(fix44::ORDER_ID).unwrap().to_string(), @@ -62,10 +66,10 @@ impl Message { } } -impl OutboundMessage for Message { +impl OutboundMessage for OutboundMsg { fn write(&self, msg: &mut HotfixMessage) { match self { - Self::NewOrderSingle(order) => { + OutboundMsg::NewOrderSingle(order) => { // order details msg.set(fix44::TRANSACT_TIME, order.transact_time.clone()); msg.set(fix44::SYMBOL, order.symbol.as_str()); @@ -83,19 +87,17 @@ impl OutboundMessage for Message { allocation.set(fix44::ALLOC_QTY, order.allocation_quantity); msg.set_groups(vec![allocation]); } - _ => unimplemented!(), } } fn message_type(&self) -> &str { match self { - Self::NewOrderSingle(_) => "D", - _ => unimplemented!(), + OutboundMsg::NewOrderSingle(_) => "D", } } } -impl InboundMessage for Message { +impl InboundMessage for InboundMsg { fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); if message_type == "8" { From e2360c927d4cdb6b2db5c893e1a79a9647950972 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Mon, 12 Jan 2026 20:28:46 +0100 Subject: [PATCH 11/11] Split up message types into outbound and inbound messages in simple new order example --- examples/simple-new-order/src/application.rs | 13 +++++-------- examples/simple-new-order/src/main.rs | 12 ++++++------ examples/simple-new-order/src/messages.rs | 20 +++++++++++--------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/examples/simple-new-order/src/application.rs b/examples/simple-new-order/src/application.rs index e9d5dea..9f0219f 100644 --- a/examples/simple-new-order/src/application.rs +++ b/examples/simple-new-order/src/application.rs @@ -1,4 +1,4 @@ -use crate::messages::Message; +use crate::messages::{InboundMsg, OutboundMsg}; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; use tracing::info; @@ -7,17 +7,14 @@ use tracing::info; pub struct TestApplication {} #[async_trait::async_trait] -impl Application for TestApplication { - async fn on_outbound_message(&self, _msg: &Message) -> OutboundDecision { +impl Application for TestApplication { + async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, msg: Message) -> InboundDecision { + async fn on_inbound_message(&self, msg: InboundMsg) -> InboundDecision { match msg { - Message::NewOrderSingle(_) => { - unimplemented!("we should not receive orders"); - } - Message::UnimplementedMessage(data) => { + InboundMsg::Unimplemented(data) => { let pretty_bytes: Vec = data .iter() .map(|b| if *b == b'\x01' { b'|' } else { *b }) diff --git a/examples/simple-new-order/src/main.rs b/examples/simple-new-order/src/main.rs index bd782b5..e09c707 100644 --- a/examples/simple-new-order/src/main.rs +++ b/examples/simple-new-order/src/main.rs @@ -2,7 +2,7 @@ mod application; mod messages; use crate::application::TestApplication; -use crate::messages::{Message, NewOrderSingle}; +use crate::messages::{NewOrderSingle, OutboundMsg}; use clap::{Parser, ValueEnum}; use hotfix::config::Config; use hotfix::field_types::{Date, Timestamp}; @@ -79,7 +79,7 @@ async fn main() { .expect("graceful shutdown to succeed"); } -async fn user_loop(session: &Initiator) { +async fn user_loop(session: &Initiator) { loop { println!("(q) to quit, (s) to send message"); @@ -105,7 +105,7 @@ async fn user_loop(session: &Initiator) { } } -async fn send_message(session: &Initiator) { +async fn send_message(session: &Initiator) { let mut order_id = format!("{}", uuid::Uuid::new_v4()); order_id.truncate(12); let order = NewOrderSingle { @@ -120,7 +120,7 @@ async fn send_message(session: &Initiator) { allocation_account: "acc1".to_string(), allocation_quantity: 230, }; - let msg = Message::NewOrderSingle(order); + let msg = OutboundMsg::NewOrderSingle(order); session.send_message(msg).await.unwrap(); } @@ -129,7 +129,7 @@ async fn start_session( config_path: &str, db_config: &Database, app: TestApplication, -) -> Initiator { +) -> Initiator { let mut config = Config::load_from_path(config_path); let session_config = config.sessions.pop().expect("config to include a session"); @@ -154,7 +154,7 @@ async fn start_session( } async fn start_web_service( - session_handle: SessionHandle, + session_handle: SessionHandle, cancellation_token: CancellationToken, ) { let config = RouterConfig { diff --git a/examples/simple-new-order/src/messages.rs b/examples/simple-new-order/src/messages.rs index ded1cc5..dbee98f 100644 --- a/examples/simple-new-order/src/messages.rs +++ b/examples/simple-new-order/src/messages.rs @@ -21,15 +21,19 @@ pub struct NewOrderSingle { } #[derive(Debug, Clone)] -pub enum Message { +pub enum InboundMsg { + Unimplemented(Vec), +} + +#[derive(Debug, Clone)] +pub enum OutboundMsg { NewOrderSingle(NewOrderSingle), - UnimplementedMessage(Vec), } -impl OutboundMessage for Message { +impl OutboundMessage for OutboundMsg { fn write(&self, msg: &mut HotfixMessage) { match self { - Self::NewOrderSingle(order) => { + OutboundMsg::NewOrderSingle(order) => { // order details msg.set(fix44::TRANSACT_TIME, order.transact_time.clone()); msg.set(fix44::SYMBOL, order.symbol.as_str()); @@ -46,21 +50,19 @@ impl OutboundMessage for Message { allocation.set(fix44::ALLOC_QTY, order.allocation_quantity); msg.set_groups(vec![allocation]); } - _ => unimplemented!(), } } fn message_type(&self) -> &str { match self { - Self::NewOrderSingle(_) => "D", - _ => unimplemented!(), + OutboundMsg::NewOrderSingle(_) => "D", } } } -impl InboundMessage for Message { +impl InboundMessage for InboundMsg { fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); - Self::UnimplementedMessage(message_type.as_bytes().to_vec()) + Self::Unimplemented(message_type.as_bytes().to_vec()) } }