Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/hotfix-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod session_controller;
use crate::endpoints::build_api_router;
use crate::session_controller::{HttpSessionController, SessionController};
use axum::Router;
use hotfix::message::FixMessage;
use hotfix::message::OutboundMessage;
use hotfix::session::SessionHandle;

#[derive(Clone)]
Expand All @@ -21,13 +21,13 @@ pub struct RouterConfig {
}

/// Build a router with default configuration (admin endpoints disabled)
pub fn build_router<M: FixMessage>(session_handle: SessionHandle<M>) -> Router {
pub fn build_router<Outbound: OutboundMessage>(session_handle: SessionHandle<Outbound>) -> Router {
build_router_with_config(session_handle, RouterConfig::default())
}

/// Build a router with custom configuration
pub fn build_router_with_config<M: FixMessage>(
session_handle: SessionHandle<M>,
pub fn build_router_with_config<Outbound: OutboundMessage>(
session_handle: SessionHandle<Outbound>,
config: RouterConfig,
) -> Router {
let controller = HttpSessionController { session_handle };
Expand Down
20 changes: 11 additions & 9 deletions crates/hotfix-web/src/session_controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use hotfix::message::FixMessage;
use hotfix::message::OutboundMessage;
use hotfix::session::{SessionHandle, SessionInfo};

/// Controller for session operations, providing both read access and administrative actions
Expand All @@ -11,12 +11,12 @@ pub trait SessionController: Clone + Send + Sync {

/// HTTP session controller implementation that wraps a SessionHandle
#[derive(Clone)]
pub struct HttpSessionController<M> {
pub(crate) session_handle: SessionHandle<M>,
pub struct HttpSessionController<Outbound> {
pub(crate) session_handle: SessionHandle<Outbound>,
}

#[async_trait::async_trait]
impl<M: FixMessage> SessionController for HttpSessionController<M> {
impl<Outbound: OutboundMessage> SessionController for HttpSessionController<Outbound> {
async fn get_session_info(&self) -> anyhow::Result<SessionInfo> {
self.session_handle.get_session_info().await
}
Expand All @@ -34,7 +34,9 @@ impl<M: FixMessage> SessionController for HttpSessionController<M> {
// Note: We can't use a blanket impl due to Rust's orphan rules (can't impl foreign trait for generic type)
#[cfg(feature = "ui")]
#[async_trait::async_trait]
impl<M: FixMessage> hotfix_web_ui::SessionInfoProvider for HttpSessionController<M> {
impl<Outbound: OutboundMessage> hotfix_web_ui::SessionInfoProvider
for HttpSessionController<Outbound>
{
async fn get_session_info(&self) -> anyhow::Result<SessionInfo> {
// Reuse the SessionController implementation
SessionController::get_session_info(self).await
Expand All @@ -43,12 +45,12 @@ impl<M: FixMessage> hotfix_web_ui::SessionInfoProvider for HttpSessionController

// Allow extracting HttpSessionController from AppState for hotfix-web-ui
#[cfg(feature = "ui")]
impl<M> axum::extract::FromRef<crate::AppState<HttpSessionController<M>>>
for HttpSessionController<M>
impl<Outbound> axum::extract::FromRef<crate::AppState<HttpSessionController<Outbound>>>
for HttpSessionController<Outbound>
where
M: FixMessage,
Outbound: OutboundMessage,
{
fn from_ref(state: &crate::AppState<HttpSessionController<M>>) -> Self {
fn from_ref(state: &crate::AppState<HttpSessionController<Outbound>>) -> Self {
state.controller.clone()
}
}
6 changes: 3 additions & 3 deletions crates/hotfix/src/application.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#[async_trait::async_trait]
/// The application users of HotFIX can implement to hook into the engine.
pub trait Application<M>: Send + Sync + 'static {
pub trait Application<Inbound, Outbound>: Send + Sync + 'static {
/// Called when a message is sent to the engine to be sent to the counterparty.
///
/// This is invoked before the raw message is persisted in the message store.
async fn on_outbound_message(&self, msg: &M) -> OutboundDecision;
async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision;
/// Called when a message is received from the counterparty.
///
/// This is invoked after the message is verified and parsed into a typed message.
async fn on_inbound_message(&self, msg: M) -> InboundDecision;
async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision;
/// Called when the session is logged out.
async fn on_logout(&mut self, reason: &str);
/// Called when the session is logged on.
Expand Down
20 changes: 10 additions & 10 deletions crates/hotfix/src/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ use tracing::{debug, warn};

use crate::application::Application;
use crate::config::SessionConfig;
use crate::message::FixMessage;
use crate::message::{InboundMessage, OutboundMessage};
use crate::session::{InternalSessionRef, SessionHandle};
use crate::store::MessageStore;
use crate::transport::connect;

#[derive(Clone)]
pub struct Initiator<M> {
pub struct Initiator<Outbound> {
pub config: SessionConfig,
session_handle: SessionHandle<M>,
session_handle: SessionHandle<Outbound>,
completion_rx: watch::Receiver<bool>,
}

impl<M: FixMessage> Initiator<M> {
pub async fn start(
impl<Outbound: OutboundMessage> Initiator<Outbound> {
pub async fn start<Inbound: InboundMessage>(
config: SessionConfig,
application: impl Application<M>,
application: impl Application<Inbound, Outbound>,
store: impl MessageStore + Send + Sync + 'static,
) -> Self {
let session_ref = InternalSessionRef::new(config.clone(), application, store);
Expand All @@ -47,7 +47,7 @@ impl<M: FixMessage> Initiator<M> {
}
}

pub async fn send_message(&self, msg: M) -> anyhow::Result<()> {
pub async fn send_message(&self, msg: Outbound) -> anyhow::Result<()> {
self.session_handle.send_message(msg).await?;

Ok(())
Expand All @@ -57,7 +57,7 @@ impl<M: FixMessage> Initiator<M> {
self.config.sender_comp_id == sender_comp_id && self.config.target_comp_id == target_comp_id
}

pub fn session_handle(&self) -> SessionHandle<M> {
pub fn session_handle(&self) -> SessionHandle<Outbound> {
self.session_handle.clone()
}

Expand Down Expand Up @@ -85,9 +85,9 @@ impl<M: FixMessage> Initiator<M> {
}
}

async fn establish_connection<M: FixMessage>(
async fn establish_connection<Outbound: OutboundMessage>(
config: SessionConfig,
session_ref: InternalSessionRef<M>,
session_ref: InternalSessionRef<Outbound>,
completion_tx: watch::Sender<bool>,
) {
loop {
Expand Down
12 changes: 4 additions & 8 deletions crates/hotfix/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ pub mod verification;
pub use parser::RawFixMessage;
pub use resend_request::ResendRequest;

pub trait FixMessage: Clone + Send + 'static {
pub trait OutboundMessage: Clone + Send + 'static {
fn write(&self, msg: &mut Message);

fn message_type(&self) -> &str;
}

pub trait InboundMessage: Clone + Send + 'static {
fn parse(message: &Message) -> Self;
}

Expand All @@ -31,7 +33,7 @@ pub fn generate_message(
sender_comp_id: &str,
target_comp_id: &str,
msg_seq_num: u64,
message: impl FixMessage,
message: impl OutboundMessage,
) -> Result<Vec<u8>, EncodeError> {
let mut msg = Message::new(begin_string, message.message_type());
msg.set(SENDER_COMP_ID, sender_comp_id);
Expand All @@ -43,9 +45,3 @@ pub fn generate_message(

msg.encode(&Config::default())
}

pub trait WriteMessage {
fn write(&self, msg: &mut Message);

fn message_type(&self) -> &str;
}
6 changes: 4 additions & 2 deletions crates/hotfix/src/message/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::{InboundMessage, OutboundMessage};
use hotfix_message::Part;
use hotfix_message::message::Message;
use hotfix_message::session_fields::TEST_REQ_ID;
Expand All @@ -16,7 +16,7 @@ impl Heartbeat {
}
}

impl FixMessage for Heartbeat {
impl OutboundMessage for Heartbeat {
fn write(&self, msg: &mut Message) {
if let Some(req_id) = &self.test_req_id {
msg.set(TEST_REQ_ID, req_id.as_str());
Expand All @@ -26,7 +26,9 @@ impl FixMessage for Heartbeat {
fn message_type(&self) -> &str {
"0"
}
}

impl InboundMessage for Heartbeat {
fn parse(_message: &Message) -> Self {
// TODO: this needs to be implemented properly when we're implementing Test Requests
Heartbeat { test_req_id: None }
Expand Down
8 changes: 2 additions & 6 deletions crates/hotfix/src/message/logon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::OutboundMessage;
use hotfix_message::message::Message;
use hotfix_message::session_fields::{
ENCRYPT_METHOD, HEART_BT_INT, NEXT_EXPECTED_MSG_SEQ_NUM, RESET_SEQ_NUM_FLAG,
Expand Down Expand Up @@ -33,7 +33,7 @@ impl Logon {
}
}

impl FixMessage for Logon {
impl OutboundMessage for Logon {
fn write(&self, msg: &mut Message) {
msg.set(ENCRYPT_METHOD, self.encrypt_method);
msg.set(HEART_BT_INT, self.heartbeat_interval);
Expand All @@ -47,10 +47,6 @@ impl FixMessage for Logon {
fn message_type(&self) -> &str {
"A"
}

fn parse(_message: &Message) -> Self {
todo!()
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FieldType)]
Expand Down
8 changes: 2 additions & 6 deletions crates/hotfix/src/message/logout.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::OutboundMessage;
use hotfix_message::Part;
use hotfix_message::message::Message;
use hotfix_message::session_fields::TEXT;
Expand All @@ -14,7 +14,7 @@ impl Logout {
}
}

impl FixMessage for Logout {
impl OutboundMessage for Logout {
fn write(&self, msg: &mut Message) {
if let Some(value) = &self.text {
msg.set(TEXT, value.as_str());
Expand All @@ -24,8 +24,4 @@ impl FixMessage for Logout {
fn message_type(&self) -> &str {
"5"
}

fn parse(_message: &Message) -> Self {
unimplemented!()
}
}
6 changes: 4 additions & 2 deletions crates/hotfix/src/message/reject.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::{InboundMessage, OutboundMessage};
use hotfix_message::Part;
use hotfix_message::message::Message;
use hotfix_message::session_fields::{
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Reject {
}
}

impl FixMessage for Reject {
impl OutboundMessage for Reject {
fn write(&self, msg: &mut Message) {
msg.set(REF_SEQ_NUM, self.ref_seq_num);

Expand All @@ -73,7 +73,9 @@ impl FixMessage for Reject {
fn message_type(&self) -> &str {
"3"
}
}

impl InboundMessage for Reject {
fn parse(message: &Message) -> Self {
Self {
ref_seq_num: message.get(REF_SEQ_NUM).unwrap(),
Expand Down
8 changes: 2 additions & 6 deletions crates/hotfix/src/message/resend_request.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::OutboundMessage;
use hotfix_message::Part;
use hotfix_message::message::Message;
use hotfix_message::session_fields::{BEGIN_SEQ_NO, END_SEQ_NO};
Expand All @@ -18,7 +18,7 @@ impl ResendRequest {
}
}

impl FixMessage for ResendRequest {
impl OutboundMessage for ResendRequest {
fn write(&self, msg: &mut Message) {
msg.set(BEGIN_SEQ_NO, self.begin_seq_no);
msg.set(END_SEQ_NO, self.end_seq_no);
Expand All @@ -27,8 +27,4 @@ impl FixMessage for ResendRequest {
fn message_type(&self) -> &str {
"2"
}

fn parse(_message: &Message) -> Self {
todo!()
}
}
8 changes: 2 additions & 6 deletions crates/hotfix/src/message/sequence_reset.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::OutboundMessage;
use hotfix_message::Part;
use hotfix_message::field_types::Timestamp;
use hotfix_message::message::Message;
Expand All @@ -12,7 +12,7 @@ pub struct SequenceReset {
pub new_seq_no: u64,
}

impl FixMessage for SequenceReset {
impl OutboundMessage for SequenceReset {
fn write(&self, msg: &mut Message) {
msg.set(GAP_FILL_FLAG, self.gap_fill);
msg.set(NEW_SEQ_NO, self.new_seq_no);
Expand All @@ -24,8 +24,4 @@ impl FixMessage for SequenceReset {
fn message_type(&self) -> &str {
"4"
}

fn parse(_message: &Message) -> Self {
todo!()
}
}
8 changes: 2 additions & 6 deletions crates/hotfix/src/message/test_request.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::FixMessage;
use crate::message::OutboundMessage;
use hotfix_message::Part;
use hotfix_message::message::Message;
use hotfix_message::session_fields::TEST_REQ_ID;
Expand All @@ -14,16 +14,12 @@ impl TestRequest {
}
}

impl FixMessage for TestRequest {
impl OutboundMessage for TestRequest {
fn write(&self, msg: &mut Message) {
msg.set(TEST_REQ_ID, self.test_req_id.as_str());
}

fn message_type(&self) -> &str {
"1"
}

fn parse(_message: &Message) -> Self {
unimplemented!()
}
}
Loading