Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: Downstream

on:
workflow_dispatch:
branches:
- trunk
push:
branches:
- trunk
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ jobs:
- uses: actions/checkout@v1
- uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6.3
ruby-version: 2.6.9
- name: Cache Gems
uses: actions/cache@v1
uses: actions/cache@v4
with:
path: vendor/bundle
key: ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
key: ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
restore-keys: |
${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
- name: Install Gems
run: |
sudo gem install bundler -v '1.17.3'
Expand Down
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.6
2.6.9
14 changes: 8 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ GEM
timers (~> 4.0.0)
coderay (1.1.0)
concurrent-ruby (1.1.6)
crass (1.0.4)
crass (1.0.6)
diff-lcs (1.3)
erubis (2.7.0)
ffi (1.10.0)
Expand Down Expand Up @@ -85,20 +85,21 @@ GEM
celluloid (~> 0.16.0)
rb-fsevent (>= 0.9.3)
rb-inotify (>= 0.9)
loofah (2.2.2)
loofah (2.25.0)
crass (~> 1.0.2)
nokogiri (>= 1.5.9)
nokogiri (>= 1.12.0)
lumberjack (1.0.9)
mail (2.6.3)
mime-types (>= 1.16, < 3)
method_source (0.8.2)
mime-types (2.99.3)
mini_portile2 (2.4.0)
mini_portile2 (2.8.9)
minitest (5.14.0)
multi_json (1.13.1)
nenv (0.2.0)
nokogiri (1.9.1)
mini_portile2 (~> 2.4.0)
nokogiri (1.13.10)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
notiffany (0.0.6)
nenv (~> 0.1)
shellany (~> 0.0)
Expand All @@ -111,6 +112,7 @@ GEM
pry-remote (0.1.8)
pry (~> 0.9)
slop (~> 3.0)
racc (1.8.1)
rack (1.6.9)
rack-test (0.6.3)
rack (>= 1.0)
Expand Down
6 changes: 5 additions & 1 deletion lib/acapi/amqp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ def initialize(chan, q)
@argument_errors = []
@bad_argument_queue = "acapi.error.middleware.service.bad_arguments"
@processing_failed_queue = "acapi.error.middleware.service.processing_failed"
@republish_channel = @channel.connection.create_channel
@republish_channel.confirm_select
@republish_queue = @republish_channel.queue(@queue.name, @queue.options)
Comment on lines +15 to +17
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @republish_channel is created during initialization and set up with confirm_select, but there is no corresponding cleanup logic to close this channel. While the connection closing will eventually clean up the channel, consider adding explicit channel cleanup in a cleanup/disconnect method to ensure proper resource management and avoid potential channel leaks in long-running processes.

Copilot uses AI. Check for mistakes.
@exit_after_work = false
end

Expand Down Expand Up @@ -102,7 +105,8 @@ def subscribe(opts = {})
publish_processing_failed(delivery_info, properties, payload, e)
else
new_properties = redelivery_properties(existing_retry_count, delivery_info, properties)
queue.publish(payload, new_properties)
@republish_queue.publish(payload, new_properties)
@republish_channel.wait_for_confirms
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait_for_confirms call doesn't check the return value or handle potential exceptions. wait_for_confirms returns false if any messages were not confirmed and can raise exceptions. The outer rescue block will catch exceptions, but the false return value indicating failed confirmations would be silently ignored, potentially leading to message loss without notification. Consider explicitly checking the return value.

Copilot uses AI. Check for mistakes.
channel.acknowledge(delivery_info.delivery_tag, false)
end
rescue => e
Expand Down
6 changes: 5 additions & 1 deletion lib/acapi/amqp/requestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ def initialize(conn)
def request(properties, payload, timeout = 15)
delivery_info, r_props, r_payload = [nil, nil, nil]
channel = @connection.create_channel
p_channel = @connection.create_channel
temp_queue = channel.queue("", :exclusive => true)
channel.prefetch(1)
request_exchange = channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
p_channel.confirm_select
request_exchange = p_channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
request_exchange.publish(payload, properties.dup.merge({ :reply_to => temp_queue.name, :persistent => true }))
p_channel.wait_for_confirms
delivery_info, r_props, r_payload = [nil, nil, nil]
begin
Timeout::timeout(timeout) do
Expand All @@ -26,6 +29,7 @@ def request(properties, payload, timeout = 15)
end
ensure
temp_queue.delete
p_channel.close
channel.close
end
[delivery_info, r_props, r_payload]
Expand Down
2 changes: 2 additions & 0 deletions lib/acapi/amqp/responder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ module Amqp
module Responder
def with_response_exchange(connection)
channel = connection.create_channel
channel.confirm_select
publish_exchange = channel.default_exchange
yield publish_exchange
channel.wait_for_confirms
channel.close
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/acapi/local_amqp_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def log(name, started, finished, unique_id, data = {})
end
msg = Acapi::Amqp::OutMessage.new(@app_id, name, finished, finished, unique_id, data)
@exchange.publish(*msg.to_message_properties)
@p_channel.wait_for_confirms
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait_for_confirms call doesn't check the return value or handle potential exceptions. wait_for_confirms returns false if any messages were not confirmed and can raise exceptions like Bunny::NotConnectedError. Consider checking the return value or wrapping this call in error handling to ensure message delivery failures are properly detected and handled.

Copilot uses AI. Check for mistakes.
end

def open_connection_if_needed
Expand All @@ -69,7 +70,9 @@ def open_connection_if_needed
@connection.start
@channel = @connection.create_channel
@queue = @channel.queue(QUEUE_NAME, {:durable => true})
@exchange = @channel.fanout(EXCHANGE_NAME, {:durable => true})
@p_channel = @connection.create_channel
@p_channel.confirm_select
@exchange = @p_channel.fanout(EXCHANGE_NAME, {:durable => true})
@queue.bind(@exchange, {})
end

Expand Down
7 changes: 3 additions & 4 deletions lib/acapi/requestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ def request(req_name, payload,timeout=1)
end

def open_connection_for_request
if !@connection
@connection = Bunny.new(@uri, :heartbeat => 15)
@connection.start
end
return if @connection.present? && @connection.connected?
@connection = Bunny.new(@uri, :heartbeat => 15)
@connection.start
end

def reconnect!
Expand Down
6 changes: 5 additions & 1 deletion spec/lib/acapi/local_amqp_publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
allow(session).to receive(:create_channel).and_return(channel)
allow(channel).to receive(:queue).with(forwarding_queue_name, {:durable => true}).and_return(queue)
allow(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange)
allow(channel).to receive(:confirm_select)
allow(channel).to receive(:wait_for_confirms)
allow(queue).to receive(:bind).with(exchange, {})
end

Expand Down Expand Up @@ -123,10 +125,12 @@
it "supports reconnection for after_fork" do
#publish to force the connection
allow(exchange).to receive(:publish)
allow(channel).to receive(:confirm_select)
allow(channel).to receive(:wait_for_confirms)
expect(session).to receive(:close)
expect(Bunny).to receive(:new).and_return(session)
expect(session).to receive(:start)
expect(session).to receive(:create_channel).and_return(channel)
allow(session).to receive(:create_channel).and_return(channel)
expect(channel).to receive(:queue).with(forwarding_queue_name, {:durable=> true}).and_return(queue)
expect(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange)
expect(queue).to receive(:bind).with(exchange, {})
Expand Down