diff --git a/Cargo.toml b/Cargo.toml index cec49f0..a58864e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pricelevel" -version = "0.3.1" +version = "0.4.0" edition = "2024" authors = ["Joaquin Bejar "] description = "A high-performance, lock-free price level implementation for limit order books in Rust. This library provides the building blocks for creating efficient trading systems with support for multiple order types and concurrent access patterns." @@ -36,6 +36,7 @@ crossbeam = "0.8" uuid = { version = "1.18", features = ["v4", "v5", "serde"] } ulid = { version = "1.2", features = ["serde"] } dashmap = "6.1" +sha2 = "0.10" [dev-dependencies] diff --git a/README.md b/README.md index f414416..eb76e99 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,7 @@ The performance characteristics demonstrate that the `pricelevel` library is sui **Joaquín Béjar García** - Email: jb@taunais.com + - **Telegram**: [@joaquin_bejar](https://t.me/joaquin_bejar) - GitHub: [joaquinbejar](https://github.com/joaquinbejar) We appreciate your interest and look forward to your contributions! diff --git a/README.tpl b/README.tpl index 603e452..7e377f5 100644 --- a/README.tpl +++ b/README.tpl @@ -115,6 +115,7 @@ **Joaquín Béjar García** - Email: jb@taunais.com + - **Telegram**: [@joaquin_bejar](https://t.me/joaquin_bejar) - GitHub: [joaquinbejar](https://github.com/joaquinbejar) We appreciate your interest and look forward to your contributions! diff --git a/src/errors/types.rs b/src/errors/types.rs index 8f452fb..3970bbc 100644 --- a/src/errors/types.rs +++ b/src/errors/types.rs @@ -62,6 +62,26 @@ pub enum PriceLevelError { /// Explanation of why the operation is invalid message: String, }, + + /// Error raised when serialization of internal data structures fails. + SerializationError { + /// Descriptive message with the serialization failure details + message: String, + }, + + /// Error raised when deserialization of external data into internal structures fails. + DeserializationError { + /// Descriptive message with the deserialization failure details + message: String, + }, + + /// Error raised when a checksum validation fails while restoring a snapshot. + ChecksumMismatch { + /// The checksum that was expected according to the serialized payload + expected: String, + /// The checksum that was computed from the provided payload + actual: String, + }, } impl Display for PriceLevelError { fn fmt(&self, f: &mut Formatter<'_>) -> Result { @@ -78,6 +98,15 @@ impl Display for PriceLevelError { PriceLevelError::InvalidOperation { message } => { write!(f, "Invalid operation: {message}") } + PriceLevelError::SerializationError { message } => { + write!(f, "Serialization error: {message}") + } + PriceLevelError::DeserializationError { message } => { + write!(f, "Deserialization error: {message}") + } + PriceLevelError::ChecksumMismatch { expected, actual } => { + write!(f, "Checksum mismatch: expected {expected}, got {actual}") + } } } } @@ -97,6 +126,15 @@ impl Debug for PriceLevelError { PriceLevelError::InvalidOperation { message } => { write!(f, "Invalid operation: {message}") } + PriceLevelError::SerializationError { message } => { + write!(f, "Serialization error: {message}") + } + PriceLevelError::DeserializationError { message } => { + write!(f, "Deserialization error: {message}") + } + PriceLevelError::ChecksumMismatch { expected, actual } => { + write!(f, "Checksum mismatch: expected {expected}, got {actual}") + } } } } diff --git a/src/price_level/level.rs b/src/price_level/level.rs index a724692..5aa4904 100644 --- a/src/price_level/level.rs +++ b/src/price_level/level.rs @@ -5,7 +5,7 @@ use crate::errors::PriceLevelError; use crate::execution::{MatchResult, Transaction}; use crate::orders::{OrderId, OrderType, OrderUpdate}; use crate::price_level::order_queue::OrderQueue; -use crate::price_level::{PriceLevelSnapshot, PriceLevelStatistics}; +use crate::price_level::{PriceLevelSnapshot, PriceLevelSnapshotPackage, PriceLevelStatistics}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::str::FromStr; @@ -35,6 +35,39 @@ pub struct PriceLevel { stats: Arc, } +impl PriceLevel { + /// Reconstructs a price level directly from a snapshot. + pub fn from_snapshot(mut snapshot: PriceLevelSnapshot) -> Result { + snapshot.refresh_aggregates(); + + let order_count = snapshot.orders.len(); + let queue = OrderQueue::from(snapshot.orders.clone()); + + Ok(Self { + price: snapshot.price, + visible_quantity: AtomicU64::new(snapshot.visible_quantity), + hidden_quantity: AtomicU64::new(snapshot.hidden_quantity), + order_count: AtomicUsize::new(order_count), + orders: queue, + stats: Arc::new(PriceLevelStatistics::new()), + }) + } + + /// Reconstructs a price level from a checksum-protected snapshot package. + pub fn from_snapshot_package( + package: PriceLevelSnapshotPackage, + ) -> Result { + let snapshot = package.into_snapshot()?; + Self::from_snapshot(snapshot) + } + + /// Restores a price level from its snapshot JSON representation. + pub fn from_snapshot_json(data: &str) -> Result { + let package = PriceLevelSnapshotPackage::from_json(data)?; + Self::from_snapshot_package(package) + } +} + impl PriceLevel { /// Create a new price level pub fn new(price: u64) -> Self { @@ -225,6 +258,16 @@ impl PriceLevel { orders: self.iter_orders(), } } + + /// Serialize the current price level state into a checksum-protected snapshot package. + pub fn snapshot_package(&self) -> Result { + PriceLevelSnapshotPackage::new(self.snapshot()) + } + + /// Serialize the current price level state to JSON, including checksum metadata. + pub fn snapshot_to_json(&self) -> Result { + self.snapshot_package()?.to_json() + } } impl PriceLevel { @@ -437,12 +480,31 @@ impl From<&PriceLevel> for PriceLevelData { orders: price_level .iter_orders() .into_iter() - .map(|order_arc| (*order_arc)) + .map(|order_arc| *order_arc) .collect(), } } } +impl From<&PriceLevelSnapshot> for PriceLevel { + fn from(value: &PriceLevelSnapshot) -> Self { + let mut snapshot = value.clone(); + snapshot.refresh_aggregates(); + + let order_count = snapshot.orders.len(); + let queue = OrderQueue::from(snapshot.orders.clone()); + + Self { + price: snapshot.price, + visible_quantity: AtomicU64::new(snapshot.visible_quantity), + hidden_quantity: AtomicU64::new(snapshot.hidden_quantity), + order_count: AtomicUsize::new(order_count), + orders: queue, + stats: Arc::new(PriceLevelStatistics::new()), + } + } +} + impl TryFrom for PriceLevel { type Error = PriceLevelError; diff --git a/src/price_level/mod.rs b/src/price_level/mod.rs index bc1522a..5c2bd09 100644 --- a/src/price_level/mod.rs +++ b/src/price_level/mod.rs @@ -17,5 +17,5 @@ mod tests; pub use level::{PriceLevel, PriceLevelData}; pub use order_queue::OrderQueue; -pub use snapshot::PriceLevelSnapshot; +pub use snapshot::{PriceLevelSnapshot, PriceLevelSnapshotPackage}; pub use statistics::PriceLevelStatistics; diff --git a/src/price_level/snapshot.rs b/src/price_level/snapshot.rs index 044bf29..126609d 100644 --- a/src/price_level/snapshot.rs +++ b/src/price_level/snapshot.rs @@ -3,6 +3,7 @@ use crate::orders::OrderType; use serde::de::{self, MapAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use sha2::{Digest, Sha256}; use std::fmt; use std::str::FromStr; use std::sync::Arc; @@ -45,6 +46,106 @@ impl PriceLevelSnapshot { pub fn iter_orders(&self) -> impl Iterator>> { self.orders.iter() } + + /// Recomputes aggregate fields (`visible_quantity`, `hidden_quantity`, and `order_count`) based on current orders. + pub fn refresh_aggregates(&mut self) { + self.order_count = self.orders.len(); + + let mut visible_total: u64 = 0; + let mut hidden_total: u64 = 0; + + for order in &self.orders { + visible_total = visible_total.saturating_add(order.visible_quantity()); + hidden_total = hidden_total.saturating_add(order.hidden_quantity()); + } + + self.visible_quantity = visible_total; + self.hidden_quantity = hidden_total; + } +} + +/// Format version for checksum-enabled price level snapshots. +pub const SNAPSHOT_FORMAT_VERSION: u32 = 1; + +/// Serialized representation of a price level snapshot including checksum validation metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PriceLevelSnapshotPackage { + /// Version of the serialized snapshot schema to support future migrations. + pub version: u32, + /// Captured snapshot data. + pub snapshot: PriceLevelSnapshot, + /// Hex-encoded checksum used to validate the snapshot integrity. + pub checksum: String, +} + +impl PriceLevelSnapshotPackage { + /// Creates a new snapshot package computing the checksum for the provided snapshot. + pub fn new(mut snapshot: PriceLevelSnapshot) -> Result { + snapshot.refresh_aggregates(); + + let checksum = Self::compute_checksum(&snapshot)?; + + Ok(Self { + version: SNAPSHOT_FORMAT_VERSION, + snapshot, + checksum, + }) + } + + /// Serializes the package to JSON. + pub fn to_json(&self) -> Result { + serde_json::to_string(self).map_err(|error| PriceLevelError::SerializationError { + message: error.to_string(), + }) + } + + /// Deserializes a package from JSON. + pub fn from_json(data: &str) -> Result { + serde_json::from_str(data).map_err(|error| PriceLevelError::DeserializationError { + message: error.to_string(), + }) + } + + /// Validates the checksum contained in the package against the serialized snapshot data. + pub fn validate(&self) -> Result<(), PriceLevelError> { + if self.version != SNAPSHOT_FORMAT_VERSION { + return Err(PriceLevelError::InvalidOperation { + message: format!( + "Unsupported snapshot version: {} (expected {})", + self.version, SNAPSHOT_FORMAT_VERSION + ), + }); + } + + let computed = Self::compute_checksum(&self.snapshot)?; + if computed != self.checksum { + return Err(PriceLevelError::ChecksumMismatch { + expected: self.checksum.clone(), + actual: computed, + }); + } + + Ok(()) + } + + /// Consumes the package after validating the checksum and returns the contained snapshot. + pub fn into_snapshot(self) -> Result { + self.validate()?; + Ok(self.snapshot) + } + + fn compute_checksum(snapshot: &PriceLevelSnapshot) -> Result { + let payload = + serde_json::to_vec(snapshot).map_err(|error| PriceLevelError::SerializationError { + message: error.to_string(), + })?; + + let mut hasher = Sha256::new(); + hasher.update(payload); + + let checksum_bytes = hasher.finalize(); + Ok(format!("{:x}", checksum_bytes)) + } } impl Serialize for PriceLevelSnapshot { @@ -60,7 +161,7 @@ impl Serialize for PriceLevelSnapshot { state.serialize_field("order_count", &self.order_count)?; let plain_orders: Vec> = - self.orders.iter().map(|arc_order| (**arc_order)).collect(); + self.orders.iter().map(|arc_order| **arc_order).collect(); state.serialize_field("orders", &plain_orders)?; diff --git a/src/price_level/tests/level.rs b/src/price_level/tests/level.rs index ccb7013..caebe67 100644 --- a/src/price_level/tests/level.rs +++ b/src/price_level/tests/level.rs @@ -3,6 +3,7 @@ mod tests { use crate::errors::PriceLevelError; use crate::orders::{OrderId, OrderType, OrderUpdate, PegReferenceType, Side, TimeInForce}; use crate::price_level::level::{PriceLevel, PriceLevelData}; + use crate::price_level::snapshot::SNAPSHOT_FORMAT_VERSION; use crate::{DEFAULT_RESERVE_REPLENISH_AMOUNT, UuidGenerator}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; @@ -27,6 +28,129 @@ mod tests { } } + #[test] + fn test_price_level_snapshot_roundtrip() { + let price_level = PriceLevel::new(10000); + price_level.add_order(create_standard_order(1, 10000, 100)); + price_level.add_order(create_iceberg_order(2, 10000, 50, 200)); + + let package = price_level + .snapshot_package() + .expect("Failed to create snapshot package"); + + assert_eq!(package.version, SNAPSHOT_FORMAT_VERSION); + package.validate().expect("Snapshot validation failed"); + + let json = package + .to_json() + .expect("Failed to serialize snapshot package"); + let restored = PriceLevel::from_snapshot_json(&json) + .expect("Failed to restore price level from snapshot JSON"); + + assert_eq!(restored.price(), price_level.price()); + assert_eq!(restored.visible_quantity(), price_level.visible_quantity()); + assert_eq!(restored.hidden_quantity(), price_level.hidden_quantity()); + assert_eq!(restored.order_count(), price_level.order_count()); + + let original_ids: Vec = price_level + .iter_orders() + .into_iter() + .map(|order| order.id()) + .collect(); + let restored_ids: Vec = restored + .iter_orders() + .into_iter() + .map(|order| order.id()) + .collect(); + assert_eq!(restored_ids, original_ids); + } + + #[test] + fn test_price_level_snapshot_checksum_failure() { + let price_level = PriceLevel::new(20000); + price_level.add_order(create_standard_order(1, 20000, 100)); + + let mut package = price_level + .snapshot_package() + .expect("Failed to create snapshot package"); + + package.validate().expect("Snapshot validation should pass"); + + // Corrupt the checksum and ensure validation fails + package.checksum = "deadbeef".to_string(); + let err = PriceLevel::from_snapshot_package(package) + .expect_err("Restoration should fail due to checksum mismatch"); + + assert!(matches!(err, PriceLevelError::ChecksumMismatch { .. })); + } + + #[test] + fn test_price_level_from_snapshot_preserves_order_positions() { + let price_level = PriceLevel::new(15000); + price_level.add_order(create_standard_order(1, 15000, 100)); + price_level.add_order(create_iceberg_order(2, 15000, 40, 120)); + price_level.add_order(create_post_only_order(3, 15000, 60)); + price_level.add_order(create_reserve_order(4, 15000, 30, 90, 15, true, Some(20))); + + let snapshot = price_level.snapshot(); + let restored = PriceLevel::from(&snapshot); + + let original_orders = price_level.iter_orders(); + let restored_orders = restored.iter_orders(); + + assert_eq!(restored_orders.len(), original_orders.len()); + assert_eq!(restored.order_count(), price_level.order_count()); + assert_eq!(restored.visible_quantity(), price_level.visible_quantity()); + assert_eq!(restored.hidden_quantity(), price_level.hidden_quantity()); + + for (index, (expected, actual)) in original_orders + .iter() + .zip(restored_orders.iter()) + .enumerate() + { + assert_eq!( + actual.id(), + expected.id(), + "Order mismatch at position {index}" + ); + assert_eq!(actual.timestamp(), expected.timestamp()); + } + } + + #[test] + fn test_price_level_from_snapshot_package_preserves_order_positions() { + let price_level = PriceLevel::new(17500); + price_level.add_order(create_standard_order(10, 17500, 80)); + price_level.add_order(create_trailing_stop_order(11, 17500, 50)); + price_level.add_order(create_pegged_order(12, 17500, 40)); + price_level.add_order(create_market_to_limit_order(13, 17500, 70)); + + let package = price_level + .snapshot_package() + .expect("Failed to create snapshot package"); + let restored = PriceLevel::from_snapshot_package(package) + .expect("Failed to restore price level from snapshot package"); + + let original_orders = price_level.iter_orders(); + let restored_orders = restored.iter_orders(); + + assert_eq!(restored_orders.len(), original_orders.len()); + assert_eq!(restored.order_count(), price_level.order_count()); + + for (index, (expected, actual)) in original_orders + .iter() + .zip(restored_orders.iter()) + .enumerate() + { + assert_eq!( + actual.id(), + expected.id(), + "Order mismatch at position {index}" + ); + assert_eq!(actual.timestamp(), expected.timestamp()); + } + } + fn create_iceberg_order(id: u64, price: u64, visible: u64, hidden: u64) -> OrderType<()> { let timestamp = TIMESTAMP_COUNTER.fetch_add(1, Ordering::SeqCst); OrderType::IcebergOrder { diff --git a/src/price_level/tests/snapshot.rs b/src/price_level/tests/snapshot.rs index 442b142..3f2d7bb 100644 --- a/src/price_level/tests/snapshot.rs +++ b/src/price_level/tests/snapshot.rs @@ -1,7 +1,10 @@ #[cfg(test)] mod tests { + use crate::errors::PriceLevelError; use crate::orders::{OrderId, OrderType, Side, TimeInForce}; - use crate::price_level::PriceLevelSnapshot; + use crate::price_level::snapshot::SNAPSHOT_FORMAT_VERSION; + use crate::price_level::{PriceLevelSnapshot, PriceLevelSnapshotPackage}; + use serde_json::Value; use std::str::FromStr; use std::sync::Arc; @@ -29,6 +32,68 @@ mod tests { ] } + #[test] + fn test_snapshot_package_roundtrip() { + let mut snapshot = PriceLevelSnapshot::new(42); + snapshot.orders = create_sample_orders(); + snapshot.refresh_aggregates(); + + let package = + PriceLevelSnapshotPackage::new(snapshot.clone()).expect("Failed to create package"); + + assert_eq!(package.version, SNAPSHOT_FORMAT_VERSION); + package.validate().expect("Package validation failed"); + + let json = package.to_json().expect("Failed to serialize package"); + let restored_package = + PriceLevelSnapshotPackage::from_json(&json).expect("Failed to deserialize package"); + + restored_package + .validate() + .expect("Checksum validation should succeed"); + + let restored_snapshot = restored_package + .into_snapshot() + .expect("Snapshot extraction failed"); + + assert_eq!(restored_snapshot.price, snapshot.price); + assert_eq!(restored_snapshot.order_count, snapshot.order_count); + assert_eq!( + restored_snapshot.visible_quantity, + snapshot.visible_quantity + ); + assert_eq!(restored_snapshot.hidden_quantity, snapshot.hidden_quantity); + assert_eq!(restored_snapshot.orders.len(), snapshot.orders.len()); + } + + #[test] + fn test_snapshot_package_checksum_mismatch() { + let mut snapshot = PriceLevelSnapshot::new(99); + snapshot.orders = create_sample_orders(); + snapshot.refresh_aggregates(); + + let package = PriceLevelSnapshotPackage::new(snapshot).expect("Failed to create package"); + let json = package.to_json().expect("Failed to serialize package"); + + let mut value: Value = serde_json::from_str(&json).expect("JSON parsing failed"); + if let Some(obj) = value.as_object_mut() { + obj.insert( + "checksum".to_string(), + Value::String("deadbeef".to_string()), + ); + } + + let tampered_json = serde_json::to_string(&value).expect("JSON serialization failed"); + + let tampered_package = PriceLevelSnapshotPackage::from_json(&tampered_json) + .expect("Deserialization should still succeed"); + + let err = tampered_package + .validate() + .expect_err("Checksum mismatch expected"); + assert!(matches!(err, PriceLevelError::ChecksumMismatch { .. })); + } + #[test] fn test_new() { let snapshot = PriceLevelSnapshot::new(1000);