Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions src/flow_queue/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,58 +211,56 @@ impl RabbitmqClient {

match consumer_res {
Ok(consumer) => consumer,
Err(err) => panic!("{}", err),
Err(err) => panic!("Cannot consume messages: {}", err),
}
};

println!("Starting to consume from {}", queue_name);
debug!("Starting to consume from {}", queue_name);

while let Some(delivery) = consumer.next().await {
let delivery = match delivery {
Ok(del) => del,
Err(err) => {
println!("Error receiving message: {}", err);
log::error!("Error receiving message: {}", err);
return Err(err);
}
};

let data = &delivery.data;
let message_str = match std::str::from_utf8(&data) {
Ok(str) => {
println!("Received message: {}", str);
log::info!("Received message: {}", str);
str
}
Err(err) => {
println!("Error decoding message: {}", err);
log::error!("Error decoding message: {}", err);
return Ok(());
}
};
// Parse the message
let inc_message = match serde_json::from_str::<Message>(message_str) {
Ok(mess) => {
println!("Parsed message with telegram_id: {}", mess.message_id);
mess
}
Ok(mess) => mess,
Err(err) => {
println!("Error parsing message: {}", err);
log::error!("Error parsing message: {}", err);
return Ok(());
}
};

let message = match handle_message(inc_message) {
Ok(mess) => {
println!("Handled message with telegram_id: {}", mess.message_id);
mess
}
Ok(mess) => mess,
Err(err) => {
println!("Error handling message: {}", err);
log::error!("Error handling message: {}", err);
return Ok(());
}
};

let message_json = serde_json::to_string(&message).unwrap();

println!("{}", message_json);
let message_json = match serde_json::to_string(&message) {
Ok(json) => json,
Err(err) => {
log::error!("Error serializing message: {}", err);
return Ok(());
}
};

{
let _ = self.send_message(message_json, "recieve_queue").await;
Expand Down
2 changes: 1 addition & 1 deletion src/flow_store/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl FlowStoreServiceBase for FlowStoreService {
match connection.keys("*").await {
Ok(res) => res,
Err(error) => {
print!("Can't retrieve keys from redis. Reason: {error}");
error!("Can't retrieve keys from redis. Reason: {error}");
return Err(error);
}
}
Expand Down