From e05e99a72204458dfb9e2b1f68294296399260c4 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Tue, 5 Nov 2024 23:14:46 +0100 Subject: [PATCH 1/4] :sparkles: Add timestamp, id and checksum info for a received message --- src/receive_message.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/receive_message.rs b/src/receive_message.rs index 006ace9..69224ae 100644 --- a/src/receive_message.rs +++ b/src/receive_message.rs @@ -35,6 +35,27 @@ impl ReceiveMessage { pub fn offset(&self) -> u64 { self.inner.offset } + + /// Retrieves the timestamp of the received message. + /// + /// The timestamp represents the time of the message within its topic. + pub fn timestamp(&self) -> u64 { + self.inner.timestamp + } + + /// Retrieves the id of the received message. + /// + /// The id represents unique identifier of the message within its topic. + pub fn id(&self) -> u128 { + self.inner.id + } + + /// Retrieves the checksum of the received message. + /// + /// The checksum represents the integrity of the message within its topic. + pub fn checksum(&self) -> u64 { + self.inner.checksum + } } #[derive(Clone, Copy)] From 914d4e941643613bc2fc878dcdd0cc6a9eaa96a2 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Thu, 7 Nov 2024 21:49:49 +0100 Subject: [PATCH 2/4] :Construction: WIP Fix checksum and add length and state. TODO Add headers --- src/receive_message.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/receive_message.rs b/src/receive_message.rs index 69224ae..c543d1d 100644 --- a/src/receive_message.rs +++ b/src/receive_message.rs @@ -1,5 +1,6 @@ use iggy::messages::poll_messages::PollingStrategy as RustPollingStrategy; use iggy::models::messages::PolledMessage as RustReceiveMessage; +use iggy::models::messages::MessageState as RustMessageState; use pyo3::prelude::*; use pyo3::types::PyBytes; @@ -53,9 +54,27 @@ impl ReceiveMessage { /// Retrieves the checksum of the received message. /// /// The checksum represents the integrity of the message within its topic. - pub fn checksum(&self) -> u64 { + pub fn checksum(&self) -> u32 { self.inner.checksum } + + /// Retrieves the Message's state of the received message. + /// + /// State represents the state of the response. + pub fn state(&self) -> String { + match self.inner.state { + RustMessageState::Available => "Available".to_string(), + RustMessageState::Unavailable => "Unavailable".to_string(), + RustMessageState::Poisoned => "Poisoned".to_string(), + RustMessageState::MarkedForDeletion => "Marked for Deletion".to_string() + } + + /// Retrieves the length of the received message. + /// + /// The length represents the length of the payload. + pub fn length(&self) -> u32 { + self.inner.length + } } #[derive(Clone, Copy)] From f1a209fc51d8e83eb9414539ad95033088a9908d Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Fri, 8 Nov 2024 20:52:50 +0100 Subject: [PATCH 3/4] Forgot closing bracket --- src/receive_message.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/receive_message.rs b/src/receive_message.rs index c543d1d..ec61921 100644 --- a/src/receive_message.rs +++ b/src/receive_message.rs @@ -67,6 +67,7 @@ impl ReceiveMessage { RustMessageState::Unavailable => "Unavailable".to_string(), RustMessageState::Poisoned => "Poisoned".to_string(), RustMessageState::MarkedForDeletion => "Marked for Deletion".to_string() + } } /// Retrieves the length of the received message. From 6dbf47691aaf0637b052ba4e19fd205bff09aa99 Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Wed, 29 Jan 2025 21:00:16 +0100 Subject: [PATCH 4/4] implement enum for message state --- Cargo.lock | 4 ++-- src/lib.rs | 3 ++- src/receive_message.rs | 23 ++++++++++++++++------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d5cfce..a0c136e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1205,7 +1205,7 @@ dependencies = [ [[package]] name = "iggy-py" -version = "0.2.5" +version = "0.2.6" dependencies = [ "bytes", "iggy", diff --git a/src/lib.rs b/src/lib.rs index 0ac0648..b2ead09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ mod topic; use client::IggyClient; use pyo3::prelude::*; -use receive_message::{PollingStrategy, ReceiveMessage}; +use receive_message::{MessageState, PollingStrategy, ReceiveMessage}; use send_message::SendMessage; use stream::StreamDetails; use topic::TopicDetails; @@ -20,5 +20,6 @@ fn iggy_py(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/receive_message.rs b/src/receive_message.rs index ec61921..9c231c8 100644 --- a/src/receive_message.rs +++ b/src/receive_message.rs @@ -1,6 +1,6 @@ use iggy::messages::poll_messages::PollingStrategy as RustPollingStrategy; -use iggy::models::messages::PolledMessage as RustReceiveMessage; use iggy::models::messages::MessageState as RustMessageState; +use iggy::models::messages::PolledMessage as RustReceiveMessage; use pyo3::prelude::*; use pyo3::types::PyBytes; @@ -21,6 +21,15 @@ impl ReceiveMessage { } } +#[pyclass(eq, eq_int)] +#[derive(PartialEq)] +pub enum MessageState { + Available, + Unavailable, + Poisoned, + MarkedForDeletion, +} + #[pymethods] impl ReceiveMessage { /// Retrieves the payload of the received message. @@ -58,15 +67,15 @@ impl ReceiveMessage { self.inner.checksum } - /// Retrieves the Message's state of the received message. + /// Retrieves the Message's state of the received message. /// /// State represents the state of the response. - pub fn state(&self) -> String { + pub fn state(&self) -> MessageState { match self.inner.state { - RustMessageState::Available => "Available".to_string(), - RustMessageState::Unavailable => "Unavailable".to_string(), - RustMessageState::Poisoned => "Poisoned".to_string(), - RustMessageState::MarkedForDeletion => "Marked for Deletion".to_string() + RustMessageState::Available => MessageState::Available, + RustMessageState::Unavailable => MessageState::Unavailable, + RustMessageState::Poisoned => MessageState::Poisoned, + RustMessageState::MarkedForDeletion => MessageState::MarkedForDeletion, } }