diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index c26c7e9..35a1f31 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -2,8 +2,6 @@ name: Downstream on: workflow_dispatch: - branches: - - trunk push: branches: - trunk diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 10a7a7e..d5d0eee 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,14 +9,14 @@ jobs: - uses: actions/checkout@v1 - uses: ruby/setup-ruby@v1 with: - ruby-version: 2.6.3 + ruby-version: 2.6.9 - name: Cache Gems - uses: actions/cache@v1 + uses: actions/cache@v4 with: path: vendor/bundle - key: ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }} + key: ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }} restore-keys: | - ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }} + ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }} - name: Install Gems run: | sudo gem install bundler -v '1.17.3' diff --git a/.ruby-version b/.ruby-version index 338a5b5..d48d370 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -2.6.6 +2.6.9 diff --git a/Gemfile.lock b/Gemfile.lock index 0d80874..cd3e940 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -55,7 +55,7 @@ GEM timers (~> 4.0.0) coderay (1.1.0) concurrent-ruby (1.1.6) - crass (1.0.4) + crass (1.0.6) diff-lcs (1.3) erubis (2.7.0) ffi (1.10.0) @@ -85,20 +85,21 @@ GEM celluloid (~> 0.16.0) rb-fsevent (>= 0.9.3) rb-inotify (>= 0.9) - loofah (2.2.2) + loofah (2.25.0) crass (~> 1.0.2) - nokogiri (>= 1.5.9) + nokogiri (>= 1.12.0) lumberjack (1.0.9) mail (2.6.3) mime-types (>= 1.16, < 3) method_source (0.8.2) mime-types (2.99.3) - mini_portile2 (2.4.0) + mini_portile2 (2.8.9) minitest (5.14.0) multi_json (1.13.1) nenv (0.2.0) - nokogiri (1.9.1) - mini_portile2 (~> 2.4.0) + nokogiri (1.13.10) + mini_portile2 (~> 2.8.0) + racc (~> 1.4) notiffany (0.0.6) nenv (~> 0.1) shellany (~> 0.0) @@ -111,6 +112,7 @@ GEM pry-remote (0.1.8) pry (~> 0.9) slop (~> 3.0) + racc (1.8.1) rack (1.6.9) rack-test (0.6.3) rack (>= 1.0) diff --git a/lib/acapi/amqp/client.rb b/lib/acapi/amqp/client.rb index 2adade5..f4e4f92 100644 --- a/lib/acapi/amqp/client.rb +++ b/lib/acapi/amqp/client.rb @@ -12,6 +12,9 @@ def initialize(chan, q) @argument_errors = [] @bad_argument_queue = "acapi.error.middleware.service.bad_arguments" @processing_failed_queue = "acapi.error.middleware.service.processing_failed" + @republish_channel = @channel.connection.create_channel + @republish_channel.confirm_select + @republish_queue = @republish_channel.queue(@queue.name, @queue.options) @exit_after_work = false end @@ -102,7 +105,8 @@ def subscribe(opts = {}) publish_processing_failed(delivery_info, properties, payload, e) else new_properties = redelivery_properties(existing_retry_count, delivery_info, properties) - queue.publish(payload, new_properties) + @republish_queue.publish(payload, new_properties) + @republish_channel.wait_for_confirms channel.acknowledge(delivery_info.delivery_tag, false) end rescue => e diff --git a/lib/acapi/amqp/requestor.rb b/lib/acapi/amqp/requestor.rb index 8c6173c..51ef1e1 100644 --- a/lib/acapi/amqp/requestor.rb +++ b/lib/acapi/amqp/requestor.rb @@ -11,10 +11,13 @@ def initialize(conn) def request(properties, payload, timeout = 15) delivery_info, r_props, r_payload = [nil, nil, nil] channel = @connection.create_channel + p_channel = @connection.create_channel temp_queue = channel.queue("", :exclusive => true) channel.prefetch(1) - request_exchange = channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true) + p_channel.confirm_select + request_exchange = p_channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true) request_exchange.publish(payload, properties.dup.merge({ :reply_to => temp_queue.name, :persistent => true })) + p_channel.wait_for_confirms delivery_info, r_props, r_payload = [nil, nil, nil] begin Timeout::timeout(timeout) do @@ -26,6 +29,7 @@ def request(properties, payload, timeout = 15) end ensure temp_queue.delete + p_channel.close channel.close end [delivery_info, r_props, r_payload] diff --git a/lib/acapi/amqp/responder.rb b/lib/acapi/amqp/responder.rb index dae0ac4..f964c95 100644 --- a/lib/acapi/amqp/responder.rb +++ b/lib/acapi/amqp/responder.rb @@ -3,8 +3,10 @@ module Amqp module Responder def with_response_exchange(connection) channel = connection.create_channel + channel.confirm_select publish_exchange = channel.default_exchange yield publish_exchange + channel.wait_for_confirms channel.close end end diff --git a/lib/acapi/local_amqp_publisher.rb b/lib/acapi/local_amqp_publisher.rb index 3fc38fb..a9a4cc6 100644 --- a/lib/acapi/local_amqp_publisher.rb +++ b/lib/acapi/local_amqp_publisher.rb @@ -61,6 +61,7 @@ def log(name, started, finished, unique_id, data = {}) end msg = Acapi::Amqp::OutMessage.new(@app_id, name, finished, finished, unique_id, data) @exchange.publish(*msg.to_message_properties) + @p_channel.wait_for_confirms end def open_connection_if_needed @@ -69,7 +70,9 @@ def open_connection_if_needed @connection.start @channel = @connection.create_channel @queue = @channel.queue(QUEUE_NAME, {:durable => true}) - @exchange = @channel.fanout(EXCHANGE_NAME, {:durable => true}) + @p_channel = @connection.create_channel + @p_channel.confirm_select + @exchange = @p_channel.fanout(EXCHANGE_NAME, {:durable => true}) @queue.bind(@exchange, {}) end diff --git a/lib/acapi/requestor.rb b/lib/acapi/requestor.rb index 3df0216..11c8d4f 100644 --- a/lib/acapi/requestor.rb +++ b/lib/acapi/requestor.rb @@ -32,10 +32,9 @@ def request(req_name, payload,timeout=1) end def open_connection_for_request - if !@connection - @connection = Bunny.new(@uri, :heartbeat => 15) - @connection.start - end + return if @connection.present? && @connection.connected? + @connection = Bunny.new(@uri, :heartbeat => 15) + @connection.start end def reconnect! diff --git a/spec/lib/acapi/local_amqp_publisher_spec.rb b/spec/lib/acapi/local_amqp_publisher_spec.rb index 0502b70..5bb5b8f 100644 --- a/spec/lib/acapi/local_amqp_publisher_spec.rb +++ b/spec/lib/acapi/local_amqp_publisher_spec.rb @@ -33,6 +33,8 @@ allow(session).to receive(:create_channel).and_return(channel) allow(channel).to receive(:queue).with(forwarding_queue_name, {:durable => true}).and_return(queue) allow(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange) + allow(channel).to receive(:confirm_select) + allow(channel).to receive(:wait_for_confirms) allow(queue).to receive(:bind).with(exchange, {}) end @@ -123,10 +125,12 @@ it "supports reconnection for after_fork" do #publish to force the connection allow(exchange).to receive(:publish) + allow(channel).to receive(:confirm_select) + allow(channel).to receive(:wait_for_confirms) expect(session).to receive(:close) expect(Bunny).to receive(:new).and_return(session) expect(session).to receive(:start) - expect(session).to receive(:create_channel).and_return(channel) + allow(session).to receive(:create_channel).and_return(channel) expect(channel).to receive(:queue).with(forwarding_queue_name, {:durable=> true}).and_return(queue) expect(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange) expect(queue).to receive(:bind).with(exchange, {})