Skip to content

Conversation

@danigiri
Copy link
Contributor

TL;DR this PR brings performance improvements to the surrealdb queue implementation, we are using two queries to delete from queue and return payload (vs N queries previously)

on consume we were looping through the records and deleting/retrieving them individually, it was still within the DB context, but still had some overhead. This change moves to using just two queries for the whole payload, one to delete the queued message entries and the other query to return the payload. Thanks in advance for the review

some performance gains evidence:
Screenshot 2025-08-18 at 10 15 44

  • cleared some code warnings
  • Ran clippy

surrealdb tests

    Finished `test` profile [unoptimized + debuginfo] target(s) in 1.83s
     Running unittests src/lib.rs (debug/deps/broccoli_queue-85d2d0abc0639ceb)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/edge_cases.rs (debug/deps/edge_cases-46dd8ce924dc83d5)

running 8 tests
test test_empty_payload ... ok
test test_invalid_broker_url ... ok
test test_concurrent_consume ... ok
test test_multiple_batch_publish_and_consume ... ok
test test_multiple_batch_publish_and_handler ... ok
test test_ttl_not_implemented ... ok
test test_very_large_payload ... ok
test test_message_ordering ... ok

test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 2.15s

     Running tests/fairness.rs (debug/deps/fairness-f0d52a5e238e9873)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/happy_path.rs (debug/deps/happy_path-ce072502e7fa95df)

running 12 tests
test test_batch_publish_and_consume ... ok
test test_message_acknowledgment ... ok
test test_message_auto_ack ... ok
test test_message_cancellation ... ok
test test_message_priority ... ok
test test_message_retry ... ok
test test_process_messages ... ok
test test_process_messages_with_handlers ... ok
test test_publish_and_consume ... ok
test test_delayed_message ... ok
test test_try_consume_batch ... ok
test test_scheduled_message ... ok

test result: ok. 12 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 3.52s

     Running tests/management.rs (debug/deps/management-561a043ed8478c2f)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests broccoli_queue

running 2 tests
test src/queue.rs - queue::BroccoliQueue::process_messages (line 717) - compile ... ok
test src/queue.rs - queue::BroccoliQueue::process_messages_with_handlers (line 870) - compile ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.13s

Daniel Giribet Farre and others added 6 commits July 4, 2025 14:49
- before we appended in the queue and then we inserted the index,
  inserting the index and appending last to the queue means there
  is less risk of a race condition where we consume and the index is
  not there yet
- also improved error handling  messages in ack transaction

```
     Running unittests src/lib.rs (target/release/deps/broccoli_queue-cb687fcc89d05ff9)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/edge_cases.rs (target/release/deps/edge_cases-b7892b1c580d9924)

running 8 tests
test test_concurrent_consume ... ok
test test_empty_payload ... ok
test test_invalid_broker_url ... ok
test test_message_ordering ... ok
test test_multiple_batch_publish_and_consume ... ok
test test_multiple_batch_publish_and_handler ... ok
test test_ttl_not_implemented ... ok
test test_very_large_payload ... ok

test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 2.57s

     Running tests/fairness.rs (target/release/deps/fairness-96b314741ef14cb6)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/happy_path.rs (target/release/deps/happy_path-737e9bc30ee0b476)

running 12 tests
test test_batch_publish_and_consume ... ok
test test_delayed_message ... ok
test test_message_acknowledgment ... ok
test test_message_auto_ack ... ok
test test_message_cancellation ... ok
test test_message_priority ... ok
test test_message_retry ... ok
test test_process_messages ... ok
test test_process_messages_with_handlers ... ok
test test_publish_and_consume ... ok
test test_scheduled_message ... ok
test test_try_consume_batch ... ok

test result: ok. 12 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 6.41s

     Running tests/management.rs (target/release/deps/management-270175c472ff0622)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests broccoli_queue

running 2 tests
test src/queue.rs - queue::BroccoliQueue::process_messages (line 717) - compile ... ok
test src/queue.rs - queue::BroccoliQueue::process_messages_with_handlers (line 870) - compile ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.33s
```
```
     Running unittests src/lib.rs (release/deps/broccoli_queue-cb687fcc89d05ff9)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/edge_cases.rs (release/deps/edge_cases-b7892b1c580d9924)

running 8 tests
test test_concurrent_consume ... ok
test test_empty_payload ... ok
test test_invalid_broker_url ... ok
test test_message_ordering ... ok
test test_multiple_batch_publish_and_consume ... ok
test test_multiple_batch_publish_and_handler ... ok
test test_ttl_not_implemented ... ok
test test_very_large_payload ... ok

test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 2.76s

     Running tests/fairness.rs (release/deps/fairness-96b314741ef14cb6)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/happy_path.rs (release/deps/happy_path-737e9bc30ee0b476)

running 12 tests
test test_batch_publish_and_consume ... ok
test test_delayed_message ... ok
test test_message_acknowledgment ... ok
test test_message_auto_ack ... ok
test test_message_cancellation ... ok
test test_message_priority ... ok
test test_message_retry ... ok
test test_process_messages ... ok
test test_process_messages_with_handlers ... ok
test test_publish_and_consume ... ok
test test_scheduled_message ... ok
test test_try_consume_batch ... ok

test result: ok. 12 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 6.41s

     Running tests/management.rs (release/deps/management-270175c472ff0622)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests broccoli_queue

running 2 tests
test src/queue.rs - queue::BroccoliQueue::process_messages (line 717) - compile ... ok
test src/queue.rs - queue::BroccoliQueue::process_messages_with_handlers (line 870) - compile ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.50s
```
for easier debugging and diagnostics

```
     Running unittests src/lib.rs (release/deps/broccoli_queue-cb687fcc89d05ff9)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/edge_cases.rs (release/deps/edge_cases-b7892b1c580d9924)

running 8 tests
test test_concurrent_consume ... ok
test test_empty_payload ... ok
test test_invalid_broker_url ... ok
test test_message_ordering ... ok
test test_multiple_batch_publish_and_consume ... ok
test test_multiple_batch_publish_and_handler ... ok
test test_ttl_not_implemented ... ok
test test_very_large_payload ... ok

test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 2.63s

     Running tests/fairness.rs (release/deps/fairness-96b314741ef14cb6)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/happy_path.rs (release/deps/happy_path-737e9bc30ee0b476)

running 12 tests
test test_batch_publish_and_consume ... ok
test test_delayed_message ... ok
test test_message_acknowledgment ... ok
test test_message_auto_ack ... ok
test test_message_cancellation ... ok
test test_message_priority ... ok
test test_message_retry ... ok
test test_process_messages ... ok
test test_process_messages_with_handlers ... ok
test test_publish_and_consume ... ok
test test_scheduled_message ... ok
test test_try_consume_batch ... ok

test result: ok. 12 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 6.44s

     Running tests/management.rs (release/deps/management-270175c472ff0622)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests broccoli_queue

running 2 tests
test src/queue.rs - queue::BroccoliQueue::process_messages (line 717) - compile ... ok
test src/queue.rs - queue::BroccoliQueue::process_messages_with_handlers (line 870) - compile ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.24s
```
```
     Running unittests src/lib.rs (release/deps/broccoli_queue-cb687fcc89d05ff9)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/edge_cases.rs (release/deps/edge_cases-b7892b1c580d9924)

running 8 tests
test test_concurrent_consume ... ok
test test_empty_payload ... ok
test test_invalid_broker_url ... ok
test test_message_ordering ... ok
test test_multiple_batch_publish_and_consume ... ok
test test_multiple_batch_publish_and_handler ... ok
test test_ttl_not_implemented ... ok
test test_very_large_payload ... ok

test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 2.61s

     Running tests/fairness.rs (release/deps/fairness-96b314741ef14cb6)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/happy_path.rs (release/deps/happy_path-737e9bc30ee0b476)

running 12 tests
test test_batch_publish_and_consume ... ok
test test_delayed_message ... ok
test test_message_acknowledgment ... ok
test test_message_auto_ack ... ok
test test_message_cancellation ... ok
test test_message_priority ... ok
test test_message_retry ... ok
test test_process_messages ... ok
test test_process_messages_with_handlers ... ok
test test_publish_and_consume ... ok
test test_scheduled_message ... ok
test test_try_consume_batch ... ok

test result: ok. 12 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 6.48s

     Running tests/management.rs (release/deps/management-270175c472ff0622)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests broccoli_queue

running 2 tests
test src/queue.rs - queue::BroccoliQueue::process_messages (line 717) - compile ... ok
test src/queue.rs - queue::BroccoliQueue::process_messages_with_handlers (line 870) - compile ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.35s
```
we now delete from the queue table and return payload in 2 queries only (vs N)

- pegged surrealdb version to avoid unexpected compilation errors
- fixed auto ack in consume lambda
- consume test looks if there is leftovers in processing
…rre/eng-525-surrealdb-queue-implementation-performance-and-stability

# Conflicts:
#	src/brokers/surrealdb/utils.rs
@danigiri danigiri marked this pull request as draft August 19, 2025 10:22
@danigiri
Copy link
Contributor Author

noticed a suspected performance regression in a realistic env

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant