-
Notifications
You must be signed in to change notification settings - Fork 0
Add publisher confirms to support quorum queues. #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,6 @@ name: Downstream | |
|
|
||
| on: | ||
| workflow_dispatch: | ||
| branches: | ||
| - trunk | ||
| push: | ||
| branches: | ||
| - trunk | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 2.6.6 | ||
| 2.6.9 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,9 @@ def initialize(chan, q) | |
| @argument_errors = [] | ||
| @bad_argument_queue = "acapi.error.middleware.service.bad_arguments" | ||
| @processing_failed_queue = "acapi.error.middleware.service.processing_failed" | ||
| @republish_channel = @channel.connection.create_channel | ||
| @republish_channel.confirm_select | ||
| @republish_queue = @republish_channel.queue(@queue.name, @queue.options) | ||
| @exit_after_work = false | ||
| end | ||
|
|
||
|
|
@@ -102,7 +105,8 @@ def subscribe(opts = {}) | |
| publish_processing_failed(delivery_info, properties, payload, e) | ||
| else | ||
| new_properties = redelivery_properties(existing_retry_count, delivery_info, properties) | ||
| queue.publish(payload, new_properties) | ||
| @republish_queue.publish(payload, new_properties) | ||
| @republish_channel.wait_for_confirms | ||
|
||
| channel.acknowledge(delivery_info.delivery_tag, false) | ||
| end | ||
| rescue => e | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ def log(name, started, finished, unique_id, data = {}) | |
| end | ||
| msg = Acapi::Amqp::OutMessage.new(@app_id, name, finished, finished, unique_id, data) | ||
| @exchange.publish(*msg.to_message_properties) | ||
| @p_channel.wait_for_confirms | ||
|
||
| end | ||
|
|
||
| def open_connection_if_needed | ||
|
|
@@ -69,7 +70,9 @@ def open_connection_if_needed | |
| @connection.start | ||
| @channel = @connection.create_channel | ||
| @queue = @channel.queue(QUEUE_NAME, {:durable => true}) | ||
| @exchange = @channel.fanout(EXCHANGE_NAME, {:durable => true}) | ||
| @p_channel = @connection.create_channel | ||
| @p_channel.confirm_select | ||
| @exchange = @p_channel.fanout(EXCHANGE_NAME, {:durable => true}) | ||
| @queue.bind(@exchange, {}) | ||
| end | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The @republish_channel is created during initialization and set up with confirm_select, but there is no corresponding cleanup logic to close this channel. While the connection closing will eventually clean up the channel, consider adding explicit channel cleanup in a cleanup/disconnect method to ensure proper resource management and avoid potential channel leaks in long-running processes.