diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml new file mode 100644 index 0000000..2d87a72 --- /dev/null +++ b/.github/workflows/actions.yml @@ -0,0 +1,114 @@ +on: [push, pull_request] + +jobs: + integration-test: + needs: build + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: ['ubuntu-latest'] + perl: [ '5.34', '5.32', '5.10', '5.10.0' ] + max-parallel: 2 + name: Integration test ${{ matrix.perl }} on ${{ matrix.os }} + + services: + rabbitmq: + image: rabbitmq:latest + env: + RABBITMQ_DEFAULT_VHOST: "/" + ports: + - 5672:5672/tcp + options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5 + + steps: + - name: Grab release tarball + uses: actions/download-artifact@v1 + with: + name: release-tarball + - run: tar xzf release-tarball/ar.tar.gz --strip 1 + - run: rm -rf release-tarball + - name: Set up perl + uses: shogo82148/actions-setup-perl@v1.15.3 + with: + perl-version: ${{ matrix.perl }} + - run: perl -V + - run: cpanm --quiet --notest --installdeps --with-recommends --with-develop . + - run: perl Makefile.PL + - run: make + - run: AUTHOR_TESTING=1 make test + - run: prove -Iblib/lib -r xt/ + unit-test: + needs: build + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: ['ubuntu-latest', 'macos-latest'] + perl: [ '5.34', '5.32', '5.10' ] + max-parallel: 3 + name: Perl unit tests ${{ matrix.perl }} on ${{ matrix.os }} + + steps: + - name: Grab release tarball + uses: actions/download-artifact@v1 + with: + name: release-tarball + - run: tar xzf release-tarball/ar.tar.gz --strip 1 + - name: Set up perl + uses: shogo82148/actions-setup-perl@v1.15.3 + with: + perl-version: ${{ matrix.perl }} + - run: perl -V + - run: cpanm --quiet --notest --installdeps . + - run: perl Makefile.PL + - run: make + - run: make test + + strawberry-unit-test: + needs: build + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: ['windows-latest'] + perl: [ '5.30.2.1', '5.28.2.1' ] + arch: ['64bit'] + max-parallel: 3 + name: Strawberry Perl unit tests on ${{ matrix.os }} with ${{ matrix.arch }} perl ${{ matrix.perl }} + + steps: + - name: Grab release tarball + uses: actions/download-artifact@v1 + with: + name: release-tarball + - run: tar xzf release-tarball/ar.tar.gz --strip 1 + - run: Invoke-WebRequest -Uri http://strawberryperl.com/download/${{ matrix.perl }}/strawberry-perl-${{ matrix.perl }}-${{ matrix.arch }}.msi -OutFile c:\strawberry-perl-${{ matrix.perl }}-${{ matrix.arch }}.msi + - run: Start-Process msiexec.exe -Wait -ArgumentList '/I C:\strawberry-perl-${{ matrix.perl }}-${{ matrix.arch }}.msi /quiet' + - run: perl -V + - run: cpanm --quiet --notest --installdeps . + - run: perl Makefile.PL + - run: make + - run: make test + + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: ['ubuntu-latest'] + perl: [ '5.30' ] + max-parallel: 2 + name: CPAN build ${{ matrix.perl }} on ${{ matrix.os }} + + steps: + - uses: actions/checkout@v2 + - name: Set up perl + uses: shogo82148/actions-setup-perl@v1.15.3 + with: + perl-version: ${{ matrix.perl }} + - run: perl -V + - run: cpanm --quiet --notest Dist::Zilla + - run: dzil authordeps | cpanm --quiet --notest + - run: dzil build + - run: mv AnyEvent-RabbitMQ-*.tar.gz ar.tar.gz + - uses: actions/upload-artifact@v2 + with: + path: ar.tar.gz + name: release-tarball diff --git a/.gitignore b/.gitignore index e2a4cfe..2d74792 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +AnyEvent-RabbitMQ-* +*.swp +MYMETA.* cover_db META.yml Makefile diff --git a/.shipit b/.shipit index dba08a4..bcb93b5 100644 --- a/.shipit +++ b/.shipit @@ -1,2 +1,2 @@ steps = FindVersion, ChangeVersion, CheckChangeLog, DistTest, Commit, Tag, MakeDist -svk.tagpattern = release-%v +git.tagpattern = %v diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..1edee45 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,20 @@ +language: perl + +perl: + - "5.16" + +services: + - rabbitmq + +before_install: + - sudo apt-get update -qq + - sudo apt-get install -y aspell + +install: + - cpanm --quiet --notest AnyEvent + - cpanm --quiet --notest Devel::GlobalDestruction + - cpanm --quiet --notest Module::Install::AuthorTests + - cpanm --quiet --notest Net::AMQP + - cpanm --quiet --notest Readonly + - cpanm --quiet --notest Test::Perl::Critic + - cpanm --quiet --notest Test::Spelling diff --git a/Changes b/Changes index 6ec4a3b..d2128db 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,137 @@ Revision history for Perl extension AnyEvent::RabbitMQ +1.22 Fri 12 Jun 19:55:03 BST 2020 + - Minor correction to author tests. + - Correct version constraint on version.pm. + +1.21_01 Wed 10 Jun 12:23:48 BST 2020 + - Validate peer when using TLS. (Scott O'Neil) + - Add "nodelay" option to disable Nagle's algorithm. + https://rt.cpan.org/Ticket/Display.html?id=119793 + Also José Micó's e888205306fb49167c0dc9aa4e4dc940cc922d25. + - Assign a version number to all modules in package. + +1.21 Sat 30 May 23:40:54 BST 2020 + - No changes. + +1.20_01 Sat 23 May 23:29:50 BST 2020 + - Add "no_ack" flag to "get" method. (Julio Polo) + - Prevent channelMax from overflowing. + https://rt.cpan.org/Ticket/Display.html?id=97716 (Carl Hörberg) + - Merge in documentation patch from + https://rt.cpan.org/Ticket/Display.html?id=88213 (Rod Taylor) + +1.20 Sun 17 May 14:22:17 BST 2020 + - Downgrade error to a warning. + +1.19_01 Sat 16 May 21:18:27 BST 2020 + - Allow AnyEvent::TLS options to be passed (Nicolas R). + - Correct check when in confirm mode (Ruslan Zakirov). + - Fix @INC breaking perls >= 5.26 (Nicolas R). + - Minor test fixes + +1.19 Sat Mar 21 16:49:24 GMT 2015 + - Add 'no_ack' as an optional argument to the ->consume method + (Dave Mueller). + - Fill in some missing documentation (Moritz Lenz). +1.18 Mon Sep 29 19:36:00 PDT 2014 + - Added the bind_exchange and unbind_exchange methods + for exchange-exchange bindings. + +1.17 Fri Jul 25 14:02:00 PDT 2014 + - Add support for chunking large bodies into multiple AMQP frames, + allowing the sending of large messages. + +1.16 Sat Apr 12 14:42:00 BST 2014 + - Doc fixes (Mark Ellis) + - Fix leak when calling ->close + tests (Peter Haworth) + +1.15 Mon Jul 1 12:35:00 BST 2013 + - Fix paper-bag bug in connection close - calling nonexistent method. + +1.14 Fri Jun 7 08:54:00 BST 2013 + - Fix paper-bag bug in heartbeat - always lost heartbeat even on + active connections + - on channel close, automatically call on_return callbacks for any + publishes that are waiting + - maintain more state around opening and closing to avoid hang/race + when server sends Close after client does (this is possible!) + - cope with AMQP quirk that in confirm mode, returned messages are + *also* acked/nacked + - document $channel->publish + +1.13 Thu May 2 16:48:58 PDT 2013 + - Require Net::AMQP 0.06 to: + + Get consume cancel notifications (e.g. queue deletion) + + Properly encode user-provided header strings that look like numbers + - Fix race between server-sent and client-sent cancellation. + + - Expect server to send heartbeats as promised. If it doesn't, go President + Madagasgar on its ass and SHUT DOWN EVERYTHING. + + - Rearrange many things and weaken many references to eliminate bad circular + references. Some circular refs are actually good, though; leave those. + + - Allow customized client_properties on connection. + + - Make test output clearer. + +1.12 Thu Apr 11 20:45:00 2013 + - Allow AMQP client to adjust tuning, e.g. heartbeat + (Chip Salzenberg) + + - Fix RT#84222, continue reading AMQP packets after a heartbeat. + + - Spontaneously emit hearts as per amqp 0.9.1 spec. + + The AMQP spec says, "The client should start sending heartbeats after + receiving a Connection.Tune method, and start monitoring heartbeats after + receiving Connection.Open." There is no mention of merely responding to + heartbeat packets emitted by the server. (Dave Lambley) + +1.11 Tue Mar 5 22:22:00 2013 + - Fix on_success callback for the Channel->close method (davel). + +1.10 Mon Feb 25 13:48:00 2013 + - Clarify relationship to Net::RabbitFoot. RT#71099 + + - Add TLS connection support. RT#81729 + +1.09 Mon Feb 25 12:03:00 2013 + - Support AMQP heartbeat. + + - Support AMQP 0.9 standard. (Chip Salzenberg) + + - Stop defining a _return_cb value when not using the mandatory + or immediate flags when publishing a message. This means that + if you're not using these flags, but are using an infinite set + of routing keys, then you won't leak infinite RAM. + Currently if you do use these flags and infinitely variable + routing keys, we still have a problem as we leak callbacks. + RT#79511 + +1.08 Mon Aug 27 08:43:00 2012 + - Improve Data::Dumper options for protocol dumps (Chip Salzenberg) + - More thoroughly eliminate memory leaks on incoming messages + (Chip Salzenberg) + - Properly handle channel close: Ensure pending requests fail + immediately (Chip Salzenberg) + +1.07 Tue Aug 21 15:47:00 2012 + - Fix dist by putting missing version numbers back into + all the modules. + +1.06 Tue Aug 21 15:10:00 2012 + - Fix a race condition stopping connections from closing properly. + If you ask to open a channel, and then immediately try to close + the connection then the not yet open channel would never remove + itself from the associated connection, resulting in the connection + never being terminated (as there were still channels associated + with it). + + - Stop leaking all RabbitMQ messages recieved back inside + a closure. + - Allow multiple clients to have independent connections to RabbitMQ, as long as they all use the same spec file. diff --git a/MANIFEST.SKIP b/MANIFEST.SKIP index 3285585..e922f2d 100644 --- a/MANIFEST.SKIP +++ b/MANIFEST.SKIP @@ -1,3 +1,4 @@ +^MYMETA\. \bRCS\b \bCVS\b ^MANIFEST\. @@ -19,3 +20,5 @@ ^[^/]+\.pl$ ^\.shipit$ ^\.gitignore$ +^\.travis.yml$ +^\.github\/ diff --git a/Makefile.PL b/Makefile.PL deleted file mode 100644 index 2ca2a19..0000000 --- a/Makefile.PL +++ /dev/null @@ -1,22 +0,0 @@ -use inc::Module::Install; - -name 'AnyEvent-RabbitMQ'; -all_from 'lib/AnyEvent/RabbitMQ.pm'; - -requires 'List::MoreUtils'; -requires 'Net::AMQP'; -requires 'AnyEvent'; -requires 'Devel::GlobalDestruction'; -requires 'namespace::clean'; -requires 'File::ShareDir'; -requires 'Readonly' => '1.03'; - -tests 't/*.t'; -author_tests 'xt'; -install_share; - -build_requires 'Test::More'; -build_requires 'Test::Exception'; -build_requires 'version'; -auto_install; -WriteAll; diff --git a/dist.ini b/dist.ini new file mode 100644 index 0000000..08325f6 --- /dev/null +++ b/dist.ini @@ -0,0 +1,62 @@ +name = AnyEvent-RabbitMQ +author = Masahito Ikuta +license = Perl_5 +copyright_holder = AnyEvent-RabbitMQ's developers +version = 1.22 + +[@Filter] +-bundle = @Basic +-remove = Readme + +[Metadata] +x_contributors = Tom Doran +x_contributors = Nicolas R +x_contributors = Dave Lambley +x_contributors = Ruslan Zakirov +x_contributors = Masahito Ikuta +x_contributors = Rod Taylor +x_contributors = Carl Hörberg +x_contributors = Julio Polo +x_contributors = A.J. Ragusa +x_contributors = José Micó +x_contributors = Scott O'Neil +x_contributors = Eugen Konkov + +[InstallGuide] +[CPANFile] +[MetaJSON] + +[OurPkgVersion] +underscore_eval_version = 1 +no_critic = 1 + +[MetaResources] +repository.url = git://github.com/bobtfish/AnyEvent-RabbitMQ.git +repository.web = https://github.com/bobtfish/AnyEvent-RabbitMQ +repository.type = git + +[Test::PodSpelling] +stopword = TCP +stopword = ack +stopword = qos +stopword = AMQP + +[Test::Perl::Critic] + +[Prereqs] +List::MoreUtils = 0 +Net::AMQP = 0.06 +AnyEvent = 0 +Devel::GlobalDestruction = 0 +namespace::clean = 0 +File::ShareDir = 0 +Readonly = 1.03 +perl = 5.010 + +[Prereqs / TestRequires] +Test::More = 0 +Test::Exception = 0 +version = 0.77 + +[Prereqs / TestRecommends] +Test::Pod = 0 diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index b00e421..98bcc8b 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -2,51 +2,91 @@ package AnyEvent::RabbitMQ; use strict; use warnings; - -use Data::Dumper; use Carp qw(confess croak); +use Scalar::Util qw(refaddr); use List::MoreUtils qw(none); use Devel::GlobalDestruction; -use namespace::clean; use File::ShareDir; use Readonly; +use Scalar::Util qw/ weaken /; + +require Data::Dumper; +sub Dumper { + local $Data::Dumper::Terse = 1; + local $Data::Dumper::Indent = 1; + local $Data::Dumper::Useqq = 1; + local $Data::Dumper::Deparse = 1; + local $Data::Dumper::Quotekeys = 0; + local $Data::Dumper::Sortkeys = 1; + &Data::Dumper::Dumper +} use AnyEvent::Handle; use AnyEvent::Socket; -use Net::AMQP; +use Net::AMQP 0.06; use Net::AMQP::Common qw(:all); use AnyEvent::RabbitMQ::Channel; use AnyEvent::RabbitMQ::LocalQueue; -our $VERSION = '1.05'; +use namespace::clean; + +# VERSION + +use constant { + _ST_CLOSED => 0, + _ST_OPENING => 1, + _ST_OPEN => 2, + _ST_CLOSING => 3, +}; Readonly my $DEFAULT_AMQP_SPEC - => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml'; + => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml'; + +Readonly my $DEFAULT_CHANNEL_MAX => 2**16-1; sub new { my $class = shift; return bless { verbose => 0, @_, - _is_open => 0, + _state => _ST_CLOSED, _queue => AnyEvent::RabbitMQ::LocalQueue->new, + _last_chan_id => 0, _channels => {}, _login_user => '', _server_properties => {}, + _frame_max => undef, + _body_max => undef, + _channel_max => undef, }, $class; } +sub verbose { + my $self = shift; + @_ ? ($self->{verbose} = shift) : $self->{verbose} +} + +sub is_open { + my $self = shift; + $self->{_state} == _ST_OPEN +} + sub channels { my $self = shift; return $self->{_channels}; } -sub delete_channel { +sub _delete_channel { my $self = shift; - my ($id) = @_; - return delete $self->{_channels}->{$id}; + my ($channel,) = @_; + my $c = $self->{_channels}->{$channel->id}; + if (defined($c) && refaddr($c) == refaddr($channel)) { + delete $self->{_channels}->{$channel->id}; + return 1; + } + return 0; } sub login_user { @@ -72,7 +112,7 @@ sub connect { my $self = shift; my %args = $self->_set_cbs(@_); - if ($self->{_is_open}) { + if ($self->{_state} != _ST_CLOSED) { $args{on_failure}->('Connection has already been opened'); return $self; } @@ -82,39 +122,58 @@ sub connect { $args{timeout} ||= 0; for (qw/ host port /) { - confess("No $_ passed to connect to") unless $args{$_}; + $args{$_} or return $args{on_failure}->("No $_ passed to connect"); } if ($self->{verbose}) { warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n"; } - $self->{_connect_guard} = AnyEvent::Socket::tcp_connect( + $self->{_state} = _ST_OPENING; + + weaken(my $weak_self = $self); + my $conn; $conn = AnyEvent::Socket::tcp_connect( $args{host}, $args{port}, sub { - my $fh = shift or return $args{on_failure}->( - sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!) - ); + undef $conn; + my $self = $weak_self or return; + my $fh = shift; + + unless ($fh) { + $self->{_state} = _ST_CLOSED; + return $args{on_failure}->( + sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!) + ); + } + + my $close_cb = $args{on_close}; + my $failure_cb = $args{on_failure}; $self->{_handle} = AnyEvent::Handle->new( fh => $fh, on_error => sub { my ($handle, $fatal, $message) = @_; + my $self = $weak_self or return; - $self->{_channels} = {}; - if (!$self->{_is_open}) { - $args{on_failure}->(@_); + if ($self->is_open) { + $self->_server_closed($close_cb, $message); + } + else { + $failure_cb->(@_); } - $self->{_is_open} = 0; - $self->_disconnect(); - $args{on_close}->($message); }, on_drain => sub { my ($handle) = @_; + my $self = $weak_self or return; + $self->{drain_condvar}->send if exists $self->{drain_condvar}; }, + peername => $args{host}, + $args{tls} ? (tls => 'connect') : (), + $args{tls_ctx} ? ( tls_ctx => $args{tls_ctx} ) : (), + $args{nodelay} ? ( nodelay => $args{nodelay} ) : (), ); $self->_read_loop($args{on_close}, $args{on_read_failure}); $self->_start(%args,); @@ -136,7 +195,9 @@ sub _read_loop { return if !defined $self->{_handle}; # called on_error + weaken(my $weak_self = $self); $self->{_handle}->push_read(chunk => 8, sub { + my $self = $weak_self or return; my $data = $_[1]; my $stack = $_[1]; @@ -154,18 +215,26 @@ sub _read_loop { } $self->{_handle}->push_read(chunk => $length, sub { + my $self = $weak_self or return; $stack .= $_[1]; my ($frame) = Net::AMQP->parse_raw_frames(\$stack); + $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer}; + if ($self->{verbose}) { - warn '[C] <-- [S] ' . Dumper($frame); - warn '-----------', "\n"; + warn '[C] <-- [S] ', Dumper($frame), + '-----------', "\n"; } my $id = $frame->channel; if (0 == $id) { - return if !$self->_check_close_and_clean($frame, $close_cb,); - $self->{_queue}->push($frame); + if ($frame->type_id == 8) { + # Heartbeat, no action needs taking. + } + else { + return unless $self->_check_close_and_clean($frame, $close_cb,); + $self->{_queue}->push($frame); + } } else { my $channel = $self->{_channels}->{$id}; if (defined $channel) { @@ -187,16 +256,35 @@ sub _check_close_and_clean { my $self = shift; my ($frame, $close_cb,) = @_; - return 1 if !$frame->isa('Net::AMQP::Frame::Method'); + my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef; - my $method_frame = $frame->method_frame; - return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close'); + if ($self->{_state} == _ST_CLOSED) { + return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk'); + } + + if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) { + delete $self->{_heartbeat_timer}; + $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()); + $self->_server_closed($close_cb, $frame); + return; + } - $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()); + return 1; +} + +sub _server_closed { + my $self = shift; + my ($close_cb, $why,) = @_; + + $self->{_state} = _ST_CLOSING; + for my $channel (values %{ $self->{_channels} }) { + $channel->_closed(ref($why) ? $why : $channel->_close_frame($why)); + } $self->{_channels} = {}; - $self->{_is_open} = 0; - $self->_disconnect(); - $close_cb->($frame); + $self->{_handle}->push_shutdown; + $self->{_state} = _ST_CLOSED; + + $close_cb->($why); return; } @@ -228,10 +316,15 @@ sub _start { $self->_push_write( Net::AMQP::Protocol::Connection::StartOk->new( client_properties => { - platform => 'Perl', - product => __PACKAGE__, - information => 'http://d.hatena.ne.jp/cooldaemon/', - version => __PACKAGE__->VERSION, + platform => 'Perl', + product => __PACKAGE__, + information => 'http://d.hatena.ne.jp/cooldaemon/', + version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION), + capabilities => { + consumer_cancel_notify => Net::AMQP::Value::true, + exchange_exchange_bindings => Net::AMQP::Value::true, + }, + %{ $args{client_properties} || {} }, }, mechanism => 'AMQPLAIN', response => { @@ -254,19 +347,40 @@ sub _tune { my $self = shift; my %args = @_; + weaken(my $weak_self = $self); $self->_push_read_and_valid( 'Connection::Tune', sub { + my $self = $weak_self or return; my $frame = shift; + my %tune; + foreach (qw( channel_max frame_max heartbeat )) { + my $client = $args{tune}{$_} || 0; + my $server = $frame->method_frame->$_ || 0; + + # negotiate with the server such that we cannot request a larger + # value set by the server, unless the server said unlimited + $tune{$_} = ($server == 0 or $client == 0) + ? ($server > $client ? $server : $client) # max + : ($client > $server ? $server : $client); # min + } + + if ($self->{_frame_max} = $tune{frame_max}) { + # calculate how big the body can actually be + $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN; + } + + $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX; + $self->_push_write( - Net::AMQP::Protocol::Connection::TuneOk->new( - channel_max => $frame->method_frame->channel_max, - frame_max => $frame->method_frame->frame_max, - heartbeat => $frame->method_frame->heartbeat, - ), + Net::AMQP::Protocol::Connection::TuneOk->new(%tune,) ); + if ($tune{heartbeat} > 0) { + $self->_start_heartbeat($tune{heartbeat}, %args,); + } + $self->_open(%args,); }, $args{on_failure}, @@ -275,6 +389,39 @@ sub _tune { return $self; } +sub _start_heartbeat { + my ($self, $interval, %args,) = @_; + + my $close_cb = $args{on_close}; + my $failure_cb = $args{on_read_failure}; + my $last_recv = 0; + my $idle_cycles = 0; + weaken(my $weak_self = $self); + my $timer_cb = sub { + my $self = $weak_self or return; + if ($self->{_heartbeat_recv} != $last_recv) { + $last_recv = $self->{_heartbeat_recv}; + $idle_cycles = 0; + } + elsif (++$idle_cycles > 1) { + delete $self->{_heartbeat_timer}; + $failure_cb->("Heartbeat lost"); + $self->_server_closed($close_cb, "Heartbeat lost"); + return; + } + $self->_push_write(Net::AMQP::Frame::Heartbeat->new()); + }; + + $self->{_heartbeat_recv} = time; + $self->{_heartbeat_timer} = AnyEvent->timer( + after => $interval, + interval => $interval, + cb => $timer_cb, + ); + + return $self; +} + sub _open { my $self = shift; my %args = @_; @@ -283,12 +430,11 @@ sub _open { 'Connection::Open', { virtual_host => $args{vhost}, - capabilities => '', insist => 1, }, - 'Connection::OpenOk', + 'Connection::OpenOk', sub { - $self->{_is_open} = 1; + $self->{_state} = _ST_OPEN; $self->{_login_user} = $args{user}; $args{on_success}->($self); }, @@ -299,66 +445,70 @@ sub _open { } sub close { + return if in_global_destruction; my $self = shift; my %args = $self->_set_cbs(@_); - if (!$self->{_is_open}) { + if ($self->{_state} == _ST_CLOSED) { $args{on_success}->(@_); return $self; } - - my $close_cb = sub { - $self->_close( - sub { - $self->_disconnect(); - $args{on_success}->(@_); - }, - sub { - $self->_disconnect(); - $args{on_failure}->(@_); - } - ); + if ($self->{_state} != _ST_OPEN) { + $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress"); return $self; + } + $self->{_state} = _ST_CLOSING; + + my $cv = AE::cv { + delete $self->{_closing}; + $self->_finish_close(%args); }; - if (0 == scalar keys %{$self->{_channels}}) { - return $close_cb->(); + $cv->begin(); + + my @ids = keys %{$self->{_channels}}; + for my $id (@ids) { + my $channel = $self->{_channels}->{$id}; + if ($channel->is_open) { + $cv->begin(); + $channel->close( + on_success => sub { $cv->end() }, + on_failure => sub { $cv->end() }, + ); + } } - for my $id (keys %{$self->{_channels}}) { - my $channel = $self->{_channels}->{$id} - or next; # Could have already gone away on global destruction.. - $channel->close( - on_success => $close_cb, - on_failure => sub { - $close_cb->(); - $args{on_failure}->(@_); - }, - ); - } + $cv->end(); return $self; } -sub _close { +sub _finish_close { my $self = shift; - my ($cb, $failure_cb,) = @_; + my %args = @_; - return $self if !$self->{_is_open} || 0 < scalar keys %{$self->{_channels}}; + if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) { + $args{on_failure}->("BUG: closing with channel(s) open: @ch"); + return; + } + + $self->{_state} = _ST_CLOSED; $self->_push_write_and_read( 'Connection::Close', {}, 'Connection::CloseOk', - $cb, $failure_cb, + sub { + # circular ref ok + $self->{_handle}->push_shutdown; + $args{on_success}->(@_); + }, + sub { + # circular ref ok + $self->{_handle}->push_shutdown; + $args{on_failure}->(@_); + }, ); - $self->{_is_open} = 0; - return $self; -} - -sub _disconnect { - my $self = shift; - $self->{_handle}->push_shutdown; - return $self; + return; } sub open_channel { @@ -376,15 +526,19 @@ sub open_channel { } if (!$id) { - for my $candidate_id (1 .. (2**16 - 1)) { - next if defined $self->{_channels}->{$candidate_id}; - $id = $candidate_id; - last; + my $try_id = $self->{_last_chan_id}; + for (1 .. $self->{_channel_max}) { + $try_id = 1 if ++$try_id > $self->{_channel_max}; + unless (defined $self->{_channels}->{$try_id}) { + $id = $try_id; + last; + } } if (!$id) { $args{on_failure}->('Ran out of channel ids'); return $self; } + $self->{_last_chan_id} = $id; } my $channel = AnyEvent::RabbitMQ::Channel->new( @@ -400,7 +554,7 @@ sub open_channel { $args{on_success}->($channel); }, on_failure => sub { - $self->delete_channel($id); + $self->_delete_channel($channel); $args{on_failure}->(@_); }, ); @@ -451,8 +605,9 @@ sub _push_read_and_valid { } $failure_cb->( - 'Method is not ' . join(',', @$exp) . "\n" - . 'Method was ' . ref $method_frame + $method_frame->isa('Net::AMQP::Protocol::Channel::Close') + ? 'Channel closed' + : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame) ); }); } @@ -480,7 +635,7 @@ sub _set_cbs { my %args = @_; $args{on_success} ||= sub {}; - $args{on_failure} ||= sub { return if in_global_destruction; die @_}; + $args{on_failure} ||= sub { die @_ unless in_global_destruction }; return %args; } @@ -489,7 +644,7 @@ sub _check_open { my $self = shift; my ($failure_cb) = @_; - return 1 if $self->{_is_open}; + return 1 if $self->is_open; $failure_cb->('Connection has already been closed'); return 0; @@ -507,14 +662,9 @@ sub drain_writes { delete $self->{drain_timer}; } -my $is_gd; - -END { $is_gd++ }; - sub DESTROY { my $self = shift; - return if $is_gd; - $self->close() if defined $self; + $self->close() unless in_global_destruction; return; } @@ -538,7 +688,12 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client. pass => 'guest', vhost => '/', timeout => 1, + tls => 0, # Or 1 if you'd like SSL + tls_ctx => $anyevent_tls, # or a hash of AnyEvent::TLS options. + tune => { heartbeat => 30, channel_max => $whatever, frame_max => $whatever }, + nodelay => 1, # Reduces latency by disabling Nagle's algorithm on_success => sub { + my $ar = shift; $ar->open_channel( on_success => sub { my $channel = shift; @@ -554,14 +709,24 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client. on_close => sub { my $method_frame = shift->method_frame; die $method_frame->reply_code, $method_frame->reply_text; - } + }, ); }, on_failure => $cv, - on_read_failure => sub {die @_}, + on_read_failure => sub { die @_ }, + on_return => sub { + my $frame = shift; + die "Unable to deliver ", Dumper($frame); + }, on_close => sub { - my $method_frame = shift->method_frame; - die $method_frame->reply_code, $method_frame->reply_text; + my $why = shift; + if (ref($why)) { + my $method_frame = $why->method_frame; + die $method_frame->reply_code, ": ", $method_frame->reply_text; + } + else { + die $why; + } }, ); @@ -575,16 +740,26 @@ You can use AnyEvent::RabbitMQ to - * Declare and delete exchanges * Declare, delete, bind and unbind queues - * Set QoS + * Set QoS and confirm mode * Publish, consume, get, ack, recover and reject messages * Select, commit and rollback transactions -AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and version 0-8 of the AMQP specification. +Most of these actions can be done through L. +Please see the documentation there for more details. + +AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification. + +This client is the non-blocking version, for a blocking version with a similar API, see L. =head1 AUTHOR Masahito Ikuta Ecooldaemon@gmail.comE +=head1 MAINTAINER + +Currently maintained by C<< >> due to the original +author being missing in action. + =head1 COPYRIGHT Copyright (c) 2010, the above named author(s). @@ -597,3 +772,4 @@ This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut + diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm index d800768..ab19ef1 100644 --- a/lib/AnyEvent/RabbitMQ/Channel.pm +++ b/lib/AnyEvent/RabbitMQ/Channel.pm @@ -3,24 +3,73 @@ package AnyEvent::RabbitMQ::Channel; use strict; use warnings; -use Scalar::Util qw(weaken); use AnyEvent::RabbitMQ::LocalQueue; +use AnyEvent; +use Scalar::Util qw( looks_like_number weaken ); +use Devel::GlobalDestruction; +use Carp qw(croak cluck); +use POSIX qw(ceil); +BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper } + +# VERSION + +use namespace::clean; + +use constant { + _ST_CLOSED => 0, + _ST_OPENING => 1, + _ST_OPEN => 2, +}; sub new { my $class = shift; + my $self = bless { - @_, # id, connection, on_close - _is_open => 0, - _is_active => 0, + on_close => sub {}, + @_, # id, connection, on_return, on_close, on_inactive, on_active _queue => AnyEvent::RabbitMQ::LocalQueue->new, _content_queue => AnyEvent::RabbitMQ::LocalQueue->new, - _consumer_cbs => {}, - _return_cbs => {}, }, $class; weaken($self->{connection}); + return $self->_reset; +} + +sub _reset { + my $self = shift; + + my %a = ( + _state => _ST_CLOSED, + _is_active => 0, + _is_confirm => 0, + _publish_tag => 0, + _publish_cbs => {}, # values: [on_ack, on_nack, on_return] + _consumer_cbs => {}, # values: [on_consume, on_cancel...] + ); + @$self{keys %a} = values %a; + return $self; } +sub id { + my $self = shift; + return $self->{id}; +} + +sub is_open { + my $self = shift; + return $self->{_state} == _ST_OPEN; +} + +sub is_active { + my $self = shift; + return $self->{_is_active}; +} + +sub is_confirm { + my $self = shift; + return $self->{_is_confirm}; +} + sub queue { my $self = shift; return $self->{_queue}; @@ -30,19 +79,24 @@ sub open { my $self = shift; my %args = @_; - if ($self->{_is_open}) { + if ($self->{_state} != _ST_CLOSED) { $args{on_failure}->('Channel has already been opened'); return $self; } + $self->{_state} = _ST_OPENING; + $self->{connection}->_push_write_and_read( 'Channel::Open', {}, 'Channel::OpenOk', sub { - $self->{_is_open} = 1; + $self->{_state} = _ST_OPEN; $self->{_is_active} = 1; - $args{on_success}->(); + $args{on_success}->($self); + }, + sub { + $self->{_state} = _ST_CLOSED; + $args{on_failure}->($self); }, - $args{on_failure}, $self->{id}, ); @@ -55,47 +109,96 @@ sub close { or return; my %args = $connection->_set_cbs(@_); - return $self if !$self->{_is_open}; + # If open in in progess, wait for it; 1s arbitrary timing. + + weaken(my $wself = $self); + my $t; $t = AE::timer 0, 1, sub { + (my $self = $wself) or undef $t, return; + return if $self->{_state} == _ST_OPENING; + + # No more tests are required + undef $t; + + # Double close is OK + if ($self->{_state} == _ST_CLOSED) { + $args{on_success}->($self); + return; + } + + $connection->_push_write( + $self->_close_frame, + $self->{id}, + ); - return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}}; + # The spec says that after a party sends Channel::Close, it MUST + # discard all frames for that channel. So this channel is dead + # immediately. + $self->_closed(); - for my $consumer_tag (keys %{$self->{_consumer_cbs}}) { - $self->cancel( - consumer_tag => $consumer_tag, - on_success => sub { - $self->_close(%args); + $connection->_push_read_and_valid( + 'Channel::CloseOk', + sub { + $args{on_success}->($self); + $self->_orphan(); }, - on_failure => sub { - $self->_close(%args); + sub { $args{on_failure}->(@_); - } + $self->_orphan(); + }, + $self->{id}, ); - } + }; return $self; } -sub _close { +sub _closed { my $self = shift; - my %args = @_; + my ($frame,) = @_; + $frame ||= $self->_close_frame(); - $self->{connection}->_push_write_and_read( - 'Channel::Close', {}, 'Channel::CloseOk', - sub { - $self->{_is_open} = 0; - $self->{_is_active} = 0; - $self->{connection}->delete_channel($self->{id}); - $args{on_success}->(); - }, - sub { - $self->{_is_open} = 0; - $self->{_is_active} = 0; - $self->{connection}->delete_channel($self->{id}); - $args{on_failure}->(); - }, - $self->{id}, + return if $self->{_state} == _ST_CLOSED; + $self->{_state} = _ST_CLOSED; + + # Perform callbacks for all outstanding commands + $self->{_queue}->_flush($frame); + $self->{_content_queue}->_flush($frame); + + # Fake nacks of all outstanding publishes + $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} }; + + # Report cancelation of all outstanding consumes + my @tags = keys %{ $self->{_consumer_cbs} }; + $self->_canceled($_, $frame) for @tags; + + # Report close to on_close callback + { local $@; + eval { $self->{on_close}->($frame) }; + warn "Error in channel on_close callback, ignored:\n $@ " if $@; } + + # Reset state (partly redundant) + $self->_reset; + + return $self; +} + +sub _close_frame { + my $self = shift; + my ($text,) = @_; + + Net::AMQP::Frame::Method->new( + method_frame => Net::AMQP::Protocol::Channel::Close->new( + reply_text => $text, + ), ); +} +sub _orphan { + my $self = shift; + + if (my $connection = $self->{connection}) { + $connection->_delete_channel($self); + } return $self; } @@ -117,7 +220,51 @@ sub declare_exchange { ticket => 0, nowait => 0, # FIXME }, - 'Exchange::DeclareOk', + 'Exchange::DeclareOk', + $cb, + $failure_cb, + $self->{id}, + ); + + return $self; +} + +sub bind_exchange { + my $self = shift; + my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); + + return $self if !$self->_check_open($failure_cb); + + $self->{connection}->_push_write_and_read( + 'Exchange::Bind', + { + %args, # source, destination, routing_key + ticket => 0, + nowait => 0, # FIXME + }, + 'Exchange::BindOk', + $cb, + $failure_cb, + $self->{id}, + ); + + return $self; +} + +sub unbind_exchange { + my $self = shift; + my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); + + return $self if !$self->_check_open($failure_cb); + + $self->{connection}->_push_write_and_read( + 'Exchange::Unbind', + { + %args, # source, destination, routing_key + ticket => 0, + nowait => 0, # FIXME + }, + 'Exchange::UnbindOk', $cb, $failure_cb, $self->{id}, @@ -140,7 +287,7 @@ sub delete_exchange { ticket => 0, nowait => 0, # FIXME }, - 'Exchange::DeleteOk', + 'Exchange::DeleteOk', $cb, $failure_cb, $self->{id}, @@ -168,7 +315,7 @@ sub declare_queue { ticket => 0, nowait => 0, # FIXME }, - 'Queue::DeclareOk', + 'Queue::DeclareOk', $cb, $failure_cb, $self->{id}, @@ -188,7 +335,7 @@ sub bind_queue { ticket => 0, nowait => 0, # FIXME }, - 'Queue::BindOk', + 'Queue::BindOk', $cb, $failure_cb, $self->{id}, @@ -209,7 +356,7 @@ sub unbind_queue { %args, # queue, exchange, routing_key ticket => 0, }, - 'Queue::UnbindOk', + 'Queue::UnbindOk', $cb, $failure_cb, $self->{id}, @@ -231,7 +378,7 @@ sub purge_queue { ticket => 0, nowait => 0, # FIXME }, - 'Queue::PurgeOk', + 'Queue::PurgeOk', $cb, $failure_cb, $self->{id}, @@ -255,7 +402,7 @@ sub delete_queue { ticket => 0, nowait => 0, # FIXME }, - 'Queue::DeleteOk', + 'Queue::DeleteOk', $cb, $failure_cb, $self->{id}, @@ -268,11 +415,38 @@ sub publish { my $self = shift; my %args = @_; - return $self if !$self->{_is_active}; + # Docs should advise channel-level callback over this, but still, better to give user an out + unless ($self->{_is_active}) { + if (defined $args{on_inactive}) { + $args{on_inactive}->(); + return $self; + } + croak "Can't publish on inactive channel (server flow control); provide on_inactive callback"; + } + + my $header_args = delete $args{header}; + my $body = delete $args{body}; + my $ack_cb = delete $args{on_ack}; + my $nack_cb = delete $args{on_nack}; + my $return_cb = delete $args{on_return}; + + defined($header_args) or $header_args = {}; + defined($body) or $body = ''; + if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) { + cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode" + unless $self->{_is_confirm}; + } - my $header_args = delete $args{header} || {}; - my $body = delete $args{body} || ''; - my $return_cb = delete $args{on_return} || sub {}; + my $tag; + if ($self->{_is_confirm}) { + # yeah, delivery tags in acks are sequential. see Java client + $tag = ++$self->{_publish_tag}; + if ($return_cb) { + $header_args = { %$header_args }; + $header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not + } + $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb]; + } $self->_publish( %args, @@ -282,12 +456,6 @@ sub publish { $body, ); - return $self if !$args{mandatory} && !$args{immediate}; - - $self->{_return_cbs}->{ - ($args{exchange} || '') . '_' . $args{routing_key} - } = $return_cb; - return $self; } @@ -310,16 +478,27 @@ sub _publish { } sub _header { - my ($self, $args, $body,) = @_; + my ($self, $args, $body) = @_; + + my $weight = delete $args->{weight} || 0; + + # user-provided message headers must be strings. protect values that look like numbers. + my $headers = $args->{headers} || {}; + my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers; + if (@prot) { + $headers = { + %$headers, + map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot + }; + } $self->{connection}->_push_write( Net::AMQP::Frame::Header->new( - weight => $args->{weight} || 0, + weight => $weight, body_size => length($body), header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( content_type => 'application/octet-stream', content_encoding => undef, - headers => {}, delivery_mode => 1, priority => 1, correlation_id => undef, @@ -331,6 +510,7 @@ sub _header { app_id => undef, cluster_id => undef, %$args, + headers => $headers, ), ), $self->{id}, @@ -342,10 +522,16 @@ sub _header { sub _body { my ($self, $body,) = @_; - $self->{connection}->_push_write( - Net::AMQP::Frame::Body->new(payload => $body), - $self->{id}, - ); + my $body_max = $self->{connection}->{_body_max} || length $body; + + # chunk up body into segments measured by $frame_max + while (length $body) { + $self->{connection}->_push_write( + Net::AMQP::Frame::Body->new( + payload => substr($body, 0, $body_max, '')), + $self->{id} + ); + } return $self; } @@ -356,25 +542,27 @@ sub consume { return $self if !$self->_check_open($failure_cb); - my $consumer_cb = delete $args{on_consume} || sub {}; - + my $consumer_cb = delete $args{on_consume} || sub {}; + my $cancel_cb = delete $args{on_cancel} || sub {}; + my $no_ack = delete $args{no_ack} // 1; + $self->{connection}->_push_write_and_read( 'Basic::Consume', { consumer_tag => '', no_local => 0, - no_ack => 1, + no_ack => $no_ack, exclusive => 0, + %args, # queue ticket => 0, nowait => 0, # FIXME }, - 'Basic::ConsumeOk', + 'Basic::ConsumeOk', sub { my $frame = shift; - $self->{_consumer_cbs}->{ - $frame->method_frame->consumer_tag - } = $consumer_cb; + my $tag = $frame->method_frame->consumer_tag; + $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ]; $cb->($frame); }, $failure_cb, @@ -395,44 +583,54 @@ sub cancel { return $self; } - if (!$self->{_consumer_cbs}->{$args{consumer_tag}}) { + my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}}; + unless ($cons_cbs) { $failure_cb->('Unknown consumer_tag'); return $self; } + push @$cons_cbs, $cb; - $self->{connection}->_push_write_and_read( - 'Basic::Cancel', - { + $self->{connection}->_push_write( + Net::AMQP::Protocol::Basic::Cancel->new( %args, # consumer_tag nowait => 0, - }, - 'Basic::CancelOk', - sub { - my $frame = shift; - delete $self->{_consumer_cbs}->{$args{consumer_tag}}; - $cb->($frame); - }, - $failure_cb, + ), $self->{id}, ); return $self; } +sub _canceled { + my $self = shift; + my ($tag, $frame,) = @_; + + my $cons_cbs = delete $self->{_consumer_cbs}->{$tag} + or return 0; + + shift @$cons_cbs; # no more deliveries + for my $cb (reverse @$cons_cbs) { + $cb->($frame); + } + return 1; +} + sub get { my $self = shift; my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); + my $no_ack = delete $args{no_ack} // 1; + return $self if !$self->_check_open($failure_cb); $self->{connection}->_push_write_and_read( 'Basic::Get', { - no_ack => 1, + no_ack => $no_ack, %args, # queue ticket => 0, }, - [qw(Basic::GetOk Basic::GetEmpty)], + [qw(Basic::GetOk Basic::GetEmpty)], sub { my $frame = shift; return $cb->({empty => $frame}) @@ -476,11 +674,11 @@ sub qos { 'Basic::Qos', { prefetch_count => 1, - %args, prefetch_size => 0, global => 0, + %args, }, - 'Basic::QosOk', + 'Basic::QosOk', $cb, $failure_cb, $self->{id}, @@ -489,9 +687,37 @@ sub qos { return $self; } +sub confirm { + my $self = shift; + my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); + + return $self if !$self->_check_open($failure_cb); + return $self if !$self->_check_version(0, 9, $failure_cb); + + weaken(my $wself = $self); + + $self->{connection}->_push_write_and_read( + 'Confirm::Select', + { + %args, + nowait => 0, # FIXME + }, + 'Confirm::SelectOk', + sub { + my $me = $wself or return; + $me->{_is_confirm} = 1; + $cb->(); + }, + $failure_cb, + $self->{id}, + ); + + return $self; +} + sub recover { my $self = shift; - my %args = @_; + my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); return $self if !$self->_check_open(sub {}); @@ -503,6 +729,18 @@ sub recover { $self->{id}, ); + if (!$args{nowait} && $self->_check_version(0, 9)) { + $self->{connection}->_push_read_and_valid( + 'Basic::RecoverOk', + $cb, + $failure_cb, + $self->{id}, + ); + } + else { + $cb->(); + } + return $self; } @@ -576,6 +814,9 @@ sub push_queue_or_consume { my $self = shift; my ($frame, $failure_cb,) = @_; + # Note: the spec says that after a party sends Channel::Close, it MUST + # discard all frames for that channel other than Close and CloseOk. + if ($frame->isa('Net::AMQP::Frame::Method')) { my $method_frame = $frame->method_frame; if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { @@ -583,23 +824,74 @@ sub push_queue_or_consume { Net::AMQP::Protocol::Channel::CloseOk->new(), $self->{id}, ); - $self->{_is_open} = 0; - $self->{_is_active} = 0; - $self->{connection}->delete_channel($self->{id}); - $self->{on_close}->($frame); + $self->_closed($frame); + $self->_orphan(); + return $self; + } elsif ($self->{_state} != _ST_OPEN) { + if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') || + $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) { + $self->{_queue}->push($frame); + } return $self; } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) { - my $cb = $self->{_consumer_cbs}->{ - $method_frame->consumer_tag - } || sub {}; + my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag}; + my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {}; $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb); return $self; + } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') || + $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) { + # CancelOk means we asked for a cancel. + # Cancel means queue was deleted; it is not AMQP, but RMQ supports it. + if (!$self->_canceled($method_frame->consumer_tag, $frame) + && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) { + $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag); + } + return $self; } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) { - my $cb = $self->{_return_cbs}->{ - $method_frame->exchange . '_' . $method_frame->routing_key - } || sub {}; + weaken(my $wself = $self); + my $cb = sub { + my $ret = shift; + my $me = $wself or return; + my $headers = $ret->{header}->headers || {}; + my $onret_cb; + if (defined(my $tag = $headers->{_ar_return})) { + my $cbs = $me->{_publish_cbs}->{$tag}; + $onret_cb = $cbs->[2] if $cbs; + } + $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well + $onret_cb->($frame); + }; $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb); return $self; + } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') || + $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) { + (my $resp = ref($method_frame)) =~ s/.*:://; + my $cbs; + if (!$self->{_is_confirm}) { + $failure_cb->("Received $resp when not in confirm mode"); + } + else { + my @tags; + if ($method_frame->{multiple}) { + @tags = sort { $a <=> $b } + grep { $_ <= $method_frame->{delivery_tag} } + keys %{$self->{_publish_cbs}}; + } + else { + @tags = ($method_frame->{delivery_tag}); + } + my $cbi = ($resp eq 'Ack') ? 0 : 1; + for my $tag (@tags) { + my $cbs; + if (not $cbs = delete $self->{_publish_cbs}->{$tag}) { + $failure_cb->("Received $resp of unknown delivery tag $tag"); + } + elsif ($cbs->[$cbi]) { + $cbs->[$cbi]->($frame); + } + } + } + return $self; } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) { $self->{_is_active} = $method_frame->active; $self->{connection}->_push_write( @@ -608,6 +900,9 @@ sub push_queue_or_consume { ), $self->{id}, ); + my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive'; + my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {}; + $cb->($frame); return $self; } $self->{_queue}->push($frame); @@ -623,25 +918,11 @@ sub _push_read_header_and_body { my ($type, $frame, $cb, $failure_cb,) = @_; my $response = {$type => $frame}; my $body_size = 0; - - $self->{_content_queue}->get(sub{ - my $frame = shift; - - return $failure_cb->('Received data is not header frame') - if !$frame->isa('Net::AMQP::Frame::Header'); - - my $header_frame = $frame->header_frame; - return $failure_cb->( - 'Header is not Protocol::Basic::ContentHeader' - . 'Header was ' . ref $header_frame - ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); - - $response->{header} = $header_frame; - $body_size = $frame->body_size; - }); - my $body_payload = ""; - my $next_frame; $next_frame = sub { + + weaken(my $wcontq = $self->{_content_queue}); + my $w_body_frame; + my $body_frame = sub { my $frame = shift; return $failure_cb->('Received data is not body frame') @@ -651,7 +932,8 @@ sub _push_read_header_and_body { if (length($body_payload) < $body_size) { # More to come - $self->{_content_queue}->get($next_frame); + my $contq = $wcontq or return; + $contq->get($w_body_frame); } else { $frame->payload($body_payload); @@ -659,8 +941,32 @@ sub _push_read_header_and_body { $cb->($response); } }; + $w_body_frame = $body_frame; + weaken($w_body_frame); - $self->{_content_queue}->get($next_frame); + $self->{_content_queue}->get(sub{ + my $frame = shift; + + return $failure_cb->('Received data is not header frame') + if !$frame->isa('Net::AMQP::Frame::Header'); + + my $header_frame = $frame->header_frame; + return $failure_cb->( + 'Header is not Protocol::Basic::ContentHeader' + . 'Header was ' . ref $header_frame + ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); + + $response->{header} = $header_frame; + + $body_size = $frame->body_size; + if ( $body_size ) { + my $contq = $wcontq or return; + $contq->get($body_frame); + } else { + $response->{body} = undef; + $cb->($response); + } + }); return $self; } @@ -679,20 +985,31 @@ sub _check_open { my $self = shift; my ($failure_cb) = @_; - return 1 if $self->{_is_open}; + return 1 if $self->is_open(); $failure_cb->('Channel has already been closed'); return 0; } +sub _check_version { + my $self = shift; + my ($major, $minor, $failure_cb) = @_; + + my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR; + my $amin = $Net::AMQP::Protocol::VERSION_MINOR; + + return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor; + + $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb; + return 0; +} + sub DESTROY { my $self = shift; - $self->close() if defined $self; + $self->close() if !in_global_destruction && $self->is_open(); return; } -1; - 1; __END__ @@ -705,7 +1022,29 @@ AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel. my $ch = $rf->open_channel(); $ch->declare_exchange(exchange => 'test_exchange'); -=head1 DESRIPTION +=head1 DESCRIPTION + +A RabbitMQ channel. + +A channel is a light-weight virtual connection within a TCP connection to a +RabbitMQ broker. + +=head1 ARGUMENTS FOR C + +=over + +=item on_close + +Callback invoked when the channel closes. Callback will be passed the +incoming message that caused the close, if any. + +=item on_return + +Callback invoked when a mandatory or immediate message publish fails. +Callback will be passed the incoming message, with accessors +C, C, and C. + +=back =head1 METHODS @@ -747,10 +1086,123 @@ The name of the exchange =back +=head2 bind_exchange + +Binds an exchange to another exchange, with a routing key. + +Arguments: + +=over + +=item source + +The name of the source exchange to bind + +=item destination + +The name of the destination exchange to bind + +=item routing_key + +The routing key to bind with + +=back + +=head2 unbind_exchange + =head2 delete_exchange =head2 declare_queue +Declare a queue (create it if it doesn't exist yet) for publishing messages +to on the server. + + my $done = AnyEvent->condvar; + $channel->declare_queue( + exchange => $queue_exchange, + queue => $queueName, + durable => 0, + auto_delete => 1, + passive => 0, + arguments => { 'x-expires' => 0, }, + on_success => sub { $done->send; }, + on_failure => sub { + say "Unable to create queue $queueName"; + $done->send; + }, + ); + $done->recv; + +Arguments: + +=over + +=item queue + +Name of the queue to be declared. If the queue name is the empty string, +RabbitMQ will create a unique name for the queue. This is useful for +temporary/private reply queues. + +=item on_success + +Callback that is called when the queue was declared successfully. The argument +to the callback is of type L. To get the name of the +Queue (if you declared it with an empty name), you can say + + on_success => sub { + my $method = shift; + my $name = $method->method_frame->queue; + }; + +=item on_failure + +Callback that is called when the declaration of the queue has failed. + +=item auto_delete + +0 or 1, default 0 + +=item passive + +0 or 1, default 0 + +=item durable + +0 or 1, default 0 + +=item exclusive + +0 or 1, default 0 + +=item no_ack + +0 or 1, default 1 + +=item ticket + +default 0 + +=for comment +XXX Is "exchange" a valid parameter? + +=item arguments + +C is a hashref of additional parameters which RabbitMQ extensions +may use. This list is not complete and your RabbitMQ server configuration will +determine which arguments are valid and how they act. + +=over + +=item x-expires + +The queue will automatically be removed after being idle for this many milliseconds. + +Default of 0 disables automatic queue removal. + +=back + +=back + =head2 bind_queue Binds a queue to an exchange, with a routing key. @@ -783,17 +1235,59 @@ Flushes the contents of a queue. Deletes a queue. The queue may not have any active consumers. -=head2 publish +=head2 consume -Publish a message to an exchange +Subscribe to consume messages from a queue. Arguments: =over -=item body +=item queue -The text body of the message to send. +The name of the queue to be consumed from. + +=item on_consume + +Callback called with an argument of the message which has been consumed. + +The message is a hash reference, where the value to key C
is an object +of type L, L is a +L, and C a L. + +=item on_cancel + +Callback called if consumption is cancelled. This may be at client request +or as a side effect of queue deletion. (Notification of queue deletion is a +RabbitMQ extension.) + +=item consumer_tag + +Identifies this consumer, will be auto-generated if you do not provide it, but you must +supply a value if you want to be able to later cancel the subscription. + +=item on_success + +Callback called if the subscription was successful (before the first message is consumed). + +=item on_failure + +Callback called if the subscription fails for any reason. + +=item no_ack + +Pass through the C flag. Defaults to C<1>. If set to C<1>, the server +will not expect messages to be acknowledged. + +=back + +=head2 publish + +Publish a message to an exchange. + +Arguments: + +=over =item exchange @@ -803,32 +1297,47 @@ The name of the exchange to send the message to. The routing key with which to publish the message. -=back +=item header -=head2 consume +Hash of AMQP message header info, including the confusingly similar element "headers", +which may contain arbitrary string key/value pairs. -Subscribe to consume messages from a queue. +=item body -Arguments: +The text body of the message to send. -=over +=item mandatory -=item on_consume +Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no +bindings), it will be "returned." (see "on_return") -Callback called with an argument of the message which has been consumed. +=item immediate -=item consumer_tag +Boolean; if true, then if the message cannot be delivered directly to a consumer, it +will be "returned." (see "on_return") -Identifies this consumer, will be auto-generated if you do not provide it, but you must -supply a value if you want to be able to later cancel the subscription. +=item on_ack -=item on_success +Callback called with the frame that acknowledges receipt (if channel is in confirm mode), +typically L. -Callback called if the subscription was successfull (before the first message is consumed). +=item on_nack -=item on_failure +Callback called with the frame that declines receipt (if the channel is in confirm mode), +typically L or L. -Callback called if the subscription fails for any reason. +=item on_return + +In AMQP, a "returned" message is one that cannot be delivered in compliance with the +C or C flags. + +If in confirm mode, this callback will be called with the frame that reports message +return, typically L. If confirm mode is off or +this callback is not provided, then the channel or connection objects' on_return +callbacks (if any), will be called instead. + +NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or +not on_return is called first. =back @@ -869,7 +1378,7 @@ Arguments: =item queue -Mandatory. Name of the queue to try to recieve a message from. +Mandatory. Name of the queue to try to receive a message from. =item on_success @@ -878,7 +1387,11 @@ a notification that there was nothing to collect from the queue. =item on_failure -This callback will be called if an error is signaled on this channel. +This callback will be called if an error is signalled on this channel. + +=item no_ack + +0 or 1, default 1 =back @@ -886,6 +1399,11 @@ This callback will be called if an error is signaled on this channel. =head2 qos +=head2 confirm + +Put channel into confirm mode. In confirm mode, publishes are confirmed by +the server, so the on_ack callback of publish works. + =head2 recover =head2 select_tx @@ -899,5 +1417,3 @@ This callback will be called if an error is signaled on this channel. See L for author(s), copyright and license. =cut - - diff --git a/lib/AnyEvent/RabbitMQ/LocalQueue.pm b/lib/AnyEvent/RabbitMQ/LocalQueue.pm index dec6ba4..6e222a8 100644 --- a/lib/AnyEvent/RabbitMQ/LocalQueue.pm +++ b/lib/AnyEvent/RabbitMQ/LocalQueue.pm @@ -3,6 +3,8 @@ package AnyEvent::RabbitMQ::LocalQueue; use strict; use warnings; +# VERSION + sub new { my $class = shift; return bless { @@ -43,5 +45,16 @@ sub _drain_queue { return $self; } +sub _flush { + my ($self, $frame) = @_; + + $self->_drain_queue; + + while (my $cb = shift @{$self->{_drain_code_queue}}) { + local $@; # Flush frames immediately, throwing away errors for on-close + eval { $cb->($frame) }; + } +} + 1; diff --git a/xt/perlcriticrc b/perlcritic.rc similarity index 100% rename from xt/perlcriticrc rename to perlcritic.rc diff --git a/share/README b/share/README new file mode 100644 index 0000000..2731ca7 --- /dev/null +++ b/share/README @@ -0,0 +1,3 @@ +AMQP spec files +fixed_amqp0-8.xml - standard 0.8 spec plus Rabbit extensions +fixed_amqp0-9-1.xml - standard 0.9.1 spec plus Rabbit extensions diff --git a/share/fixed_amqp0-9-1.xml b/share/fixed_amqp0-9-1.xml new file mode 100644 index 0000000..7b42816 --- /dev/null +++ b/share/fixed_amqp0-9-1.xml @@ -0,0 +1,3320 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + Indicates that the method completed successfully. This reply code is + reserved for future use - the current protocol design does not use positive + confirmation and reply codes are sent only in case of an error. + + + + + + The client attempted to transfer content larger than the server could accept + at the present time. The client may retry at a later time. + + + + + + When the exchange cannot deliver to a consumer when the immediate flag is + set. As a result of pending data on the queue or the absence of any + consumers of the queue. + + + + + + An operator intervened to close the connection for some reason. The client + may retry at some later date. + + + + + + The client tried to work with an unknown virtual host. + + + + + + The client attempted to work with a server entity to which it has no + access due to security settings. + + + + + + The client attempted to work with a server entity that does not exist. + + + + + + The client attempted to work with a server entity to which it has no + access because another client is working with it. + + + + + + The client requested a method that was not allowed because some precondition + failed. + + + + + + The sender sent a malformed frame that the recipient could not decode. + This strongly implies a programming error in the sending peer. + + + + + + The sender sent a frame that contained illegal values for one or more + fields. This strongly implies a programming error in the sending peer. + + + + + + The client sent an invalid sequence of frames, attempting to perform an + operation that was considered invalid by the server. This usually implies + a programming error in the client. + + + + + + The client attempted to work with a channel that had not been correctly + opened. This most likely indicates a fault in the client layer. + + + + + + The peer sent a frame that was not expected, usually in the context of + a content header and body. This strongly indicates a fault in the peer's + content processing. + + + + + + The server could not complete the method because it lacked sufficient + resources. This may be due to the client creating too many of some type + of entity. + + + + + + The client tried to work with some entity in a manner that is prohibited + by the server, due to security settings or by some other criteria. + + + + + + The client tried to use functionality that is not implemented in the + server. + + + + + + The server could not complete the method because of an internal error. + The server may require intervention by an operator in order to resume + normal operations. + + + + + + + + + + Identifier for the consumer, valid within the current channel. + + + + + + The server-assigned and channel-specific delivery tag + + + + The delivery tag is valid only within the channel from which the message was + received. I.e. a client MUST NOT receive a message on one channel and then + acknowledge it on another. + + + + + The server MUST NOT use a zero value for delivery tags. Zero is reserved + for client use, meaning "all messages so far received". + + + + + + + The exchange name is a client-selected string that identifies the exchange for + publish methods. + + + + + + + + + + If this field is set the server does not expect acknowledgements for + messages. That is, when a message is delivered to the client the server + assumes the delivery will succeed and immediately dequeues it. This + functionality may increase performance but at the cost of reliability. + Messages can get lost if a client dies before they are delivered to the + application. + + + + + + If the no-local field is set the server will not send messages to the connection that + published them. + + + + + + If set, the server will not respond to the method. The client should not wait + for a reply method. If the server could not complete the method it will raise a + channel or connection exception. + + + + + + Unconstrained. + + + + + + + + This table provides a set of peer properties, used for identification, debugging, + and general information. + + + + + + The queue name identifies the queue within the vhost. In methods where the queue + name may be blank, and that has no specific significance, this refers to the + 'current' queue for the channel, meaning the last queue that the client declared + on the channel. If the client did not declare a queue, and the method needs a + queue name, this will result in a 502 (syntax error) channel exception. + + + + + + + + This indicates that the message has been previously delivered to this or + another client. + + + + The server SHOULD try to signal redelivered messages when it can. When + redelivering a message that was not successfully acknowledged, the server + SHOULD deliver it to the original client if possible. + + + Declare a shared queue and publish a message to the queue. Consume the + message using explicit acknowledgements, but do not acknowledge the + message. Close the connection, reconnect, and consume from the queue + again. The message should arrive with the redelivered flag set. + + + + + The client MUST NOT rely on the redelivered field but should take it as a + hint that the message may already have been processed. A fully robust + client must be able to track duplicate received messages on non-transacted, + and locally-transacted channels. + + + + + + + The number of messages in the queue, which will be zero for newly-declared + queues. This is the number of messages present in the queue, and committed + if the channel on which they were published is transacted, that are not + waiting acknowledgement. + + + + + + The reply code. The AMQ reply codes are defined as constants at the start + of this formal specification. + + + + + + + The localised reply text. This text can be logged as an aid to resolving + issues. + + + + + + + + + + + + + + + + + + + + The connection class provides methods for a client to establish a network connection to + a server, and for both peers to operate the connection thereafter. + + + + connection = open-connection *use-connection close-connection + open-connection = C:protocol-header + S:START C:START-OK + *challenge + S:TUNE C:TUNE-OK + C:OPEN S:OPEN-OK + challenge = S:SECURE C:SECURE-OK + use-connection = *channel + close-connection = C:CLOSE S:CLOSE-OK + / S:CLOSE C:CLOSE-OK + + + + + + + + + + This method starts the connection negotiation process by telling the client the + protocol version that the server proposes, along with a list of security mechanisms + which the client can use for authentication. + + + + + If the server cannot support the protocol specified in the protocol header, + it MUST respond with a valid protocol header and then close the socket + connection. + + + The client sends a protocol header containing an invalid protocol name. + The server MUST respond by sending a valid protocol header and then closing + the connection. + + + + + The server MUST provide a protocol version that is lower than or equal to + that requested by the client in the protocol header. + + + The client requests a protocol version that is higher than any valid + implementation, e.g. 2.0. The server must respond with a protocol header + indicating its supported protocol version, e.g. 1.0. + + + + + If the client cannot handle the protocol version suggested by the server + it MUST close the socket connection without sending any further data. + + + The server sends a protocol version that is lower than any valid + implementation, e.g. 0.1. The client must respond by closing the + connection without sending any further data. + + + + + + + + + The major version number can take any value from 0 to 99 as defined in the + AMQP specification. + + + + + + The minor version number can take any value from 0 to 99 as defined in the + AMQP specification. + + + + + + + The properties SHOULD contain at least these fields: "host", specifying the + server host name or address, "product", giving the name of the server product, + "version", giving the name of the server version, "platform", giving the name + of the operating system, "copyright", if appropriate, and "information", giving + other general information. + + + Client connects to server and inspects the server properties. It checks for + the presence of the required fields. + + + + + + + A list of the security mechanisms that the server supports, delimited by spaces. + + + + + + + A list of the message locales that the server supports, delimited by spaces. The + locale defines the language in which the server will send reply texts. + + + + The server MUST support at least the en_US locale. + + + Client connects to server and inspects the locales field. It checks for + the presence of the required locale(s). + + + + + + + + + This method selects a SASL security mechanism. + + + + + + + + + The properties SHOULD contain at least these fields: "product", giving the name + of the client product, "version", giving the name of the client version, "platform", + giving the name of the operating system, "copyright", if appropriate, and + "information", giving other general information. + + + + + + + A single security mechanisms selected by the client, which must be one of those + specified by the server. + + + + The client SHOULD authenticate using the highest-level security profile it + can handle from the list provided by the server. + + + + + If the mechanism field does not contain one of the security mechanisms + proposed by the server in the Start method, the server MUST close the + connection without sending any further data. + + + Client connects to server and sends an invalid security mechanism. The + server must respond by closing the connection (a socket close, with no + connection close negotiation). + + + + + + + + A block of opaque data passed to the security mechanism. The contents of this + data are defined by the SASL security mechanism. + + + + + + + A single message locale selected by the client, which must be one of those + specified by the server. + + + + + + + + + + The SASL protocol works by exchanging challenges and responses until both peers have + received sufficient information to authenticate each other. This method challenges + the client to provide more information. + + + + + + + + Challenge information, a block of opaque binary data passed to the security + mechanism. + + + + + + + This method attempts to authenticate, passing a block of SASL data for the security + mechanism at the server side. + + + + + + + A block of opaque data passed to the security mechanism. The contents of this + data are defined by the SASL security mechanism. + + + + + + + + + + This method proposes a set of connection configuration values to the client. The + client can accept and/or adjust these. + + + + + + + + + Specifies highest channel number that the server permits. Usable channel numbers + are in the range 1..channel-max. Zero indicates no specified limit. + + + + + + The largest frame size that the server proposes for the connection, including + frame header and end-byte. The client can negotiate a lower value. Zero means + that the server does not impose any specific limit but may reject very large + frames if it cannot allocate resources for them. + + + + Until the frame-max has been negotiated, both peers MUST accept frames of up + to frame-min-size octets large, and the minimum negotiated value for frame-max + is also frame-min-size. + + + Client connects to server and sends a large properties field, creating a frame + of frame-min-size octets. The server must accept this frame. + + + + + + + The delay, in seconds, of the connection heartbeat that the server wants. + Zero means the server does not want a heartbeat. + + + + + + + This method sends the client's connection tuning parameters to the server. + Certain fields are negotiated, others provide capability information. + + + + + + + The maximum total number of channels that the client will use per connection. + + + + If the client specifies a channel max that is higher than the value provided + by the server, the server MUST close the connection without attempting a + negotiated close. The server may report the error in some fashion to assist + implementors. + + + + + + + + + The largest frame size that the client and server will use for the connection. + Zero means that the client does not impose any specific limit but may reject + very large frames if it cannot allocate resources for them. Note that the + frame-max limit applies principally to content frames, where large contents can + be broken into frames of arbitrary size. + + + + Until the frame-max has been negotiated, both peers MUST accept frames of up + to frame-min-size octets large, and the minimum negotiated value for frame-max + is also frame-min-size. + + + + + If the client specifies a frame max that is higher than the value provided + by the server, the server MUST close the connection without attempting a + negotiated close. The server may report the error in some fashion to assist + implementors. + + + + + + + The delay, in seconds, of the connection heartbeat that the client wants. Zero + means the client does not want a heartbeat. + + + + + + + + + This method opens a connection to a virtual host, which is a collection of + resources, and acts to separate multiple application domains within a server. + The server may apply arbitrary limits per virtual host, such as the number + of each type of entity that may be used, per connection and/or in total. + + + + + + + + The name of the virtual host to work with. + + + + If the server supports multiple virtual hosts, it MUST enforce a full + separation of exchanges, queues, and all associated entities per virtual + host. An application, connected to a specific virtual host, MUST NOT be able + to access resources of another virtual host. + + + + + The server SHOULD verify that the client has permission to access the + specified virtual host. + + + + + + + + + + + + This method signals to the client that the connection is ready for use. + + + + + + + + + + + This method indicates that the sender wants to close the connection. This may be + due to internal conditions (e.g. a forced shut-down) or due to an error handling + a specific method, i.e. an exception. When a close is due to an exception, the + sender provides the class and method id of the method which caused the exception. + + + + After sending this method, any received methods except Close and Close-OK MUST + be discarded. The response to receiving a Close after sending Close must be to + send Close-Ok. + + + + + + + + + + + + + When the close is provoked by a method exception, this is the class of the + method. + + + + + + When the close is provoked by a method exception, this is the ID of the method. + + + + + + + This method confirms a Connection.Close method and tells the recipient that it is + safe to release resources for the connection and close the socket. + + + + A peer that detects a socket closure without having received a Close-Ok + handshake method SHOULD log the error. + + + + + + + + + + + + The channel class provides methods for a client to establish a channel to a + server and for both peers to operate the channel thereafter. + + + + channel = open-channel *use-channel close-channel + open-channel = C:OPEN S:OPEN-OK + use-channel = C:FLOW S:FLOW-OK + / S:FLOW C:FLOW-OK + / functional-class + close-channel = C:CLOSE S:CLOSE-OK + / S:CLOSE C:CLOSE-OK + + + + + + + + + + This method opens a channel to the server. + + + + The client MUST NOT use this method on an already-opened channel. + + + Client opens a channel and then reopens the same channel. + + + + + + + + + + + This method signals to the client that the channel is ready for use. + + + + + + + + + + + This method asks the peer to pause or restart the flow of content data sent by + a consumer. This is a simple flow-control mechanism that a peer can use to avoid + overflowing its queues or otherwise finding itself receiving more messages than + it can process. Note that this method is not intended for window control. It does + not affect contents returned by Basic.Get-Ok methods. + + + + + When a new channel is opened, it is active (flow is active). Some applications + assume that channels are inactive until started. To emulate this behaviour a + client MAY open the channel, then pause it. + + + + + + When sending content frames, a peer SHOULD monitor the channel for incoming + methods and respond to a Channel.Flow as rapidly as possible. + + + + + + A peer MAY use the Channel.Flow method to throttle incoming content data for + internal reasons, for example, when exchanging data over a slower connection. + + + + + + The peer that requests a Channel.Flow method MAY disconnect and/or ban a peer + that does not respect the request. This is to prevent badly-behaved clients + from overwhelming a server. + + + + + + + + + + + If 1, the peer starts sending content frames. If 0, the peer stops sending + content frames. + + + + + + + Confirms to the peer that a flow command was received and processed. + + + + + + Confirms the setting of the processed flow method: 1 means the peer will start + sending or continue to send content frames; 0 means it will not. + + + + + + + + + This method indicates that the sender wants to close the channel. This may be due to + internal conditions (e.g. a forced shut-down) or due to an error handling a specific + method, i.e. an exception. When a close is due to an exception, the sender provides + the class and method id of the method which caused the exception. + + + + After sending this method, any received methods except Close and Close-OK MUST + be discarded. The response to receiving a Close after sending Close must be to + send Close-Ok. + + + + + + + + + + + + + When the close is provoked by a method exception, this is the class of the + method. + + + + + + When the close is provoked by a method exception, this is the ID of the method. + + + + + + + This method confirms a Channel.Close method and tells the recipient that it is safe + to release resources for the channel. + + + + A peer that detects a socket closure without having received a Channel.Close-Ok + handshake method SHOULD log the error. + + + + + + + + + + + + Exchanges match and distribute messages across queues. Exchanges can be configured in + the server or declared at runtime. + + + + exchange = C:DECLARE S:DECLARE-OK + / C:DELETE S:DELETE-OK + / C:BIND S:BIND-OK + / C:UNBIND S:UNBIND-OK + + + + + + + + The server MUST implement these standard exchange types: fanout, direct. + + + Client attempts to declare an exchange with each of these standard types. + + + + + The server SHOULD implement these standard exchange types: topic, headers. + + + Client attempts to declare an exchange with each of these standard types. + + + + + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance, if defined, is "amq." followed by the exchange type name. + + + The server MUST, in each virtual host, pre-declare at least two direct + exchange instances: one named "amq.direct", the other with no public name + that serves as a default exchange for Publish methods. + + + Client declares a temporary queue and attempts to bind to each required + exchange instance ("amq.fanout", "amq.direct", "amq.topic", and "amq.headers" + if those types are defined). + + + + + The server MUST pre-declare a direct exchange with no public name to act as + the default exchange for content Publish methods and for default queue bindings. + + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + + + + + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + + + + + The server MAY implement other exchange types as wanted. + + + + + + + + This method creates an exchange if it does not already exist, and if the exchange + exists, verifies that it is of the correct and expected class. + + + + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + + The client declares as many exchanges as it can until the server reports + an error; the number of exchanges successfully declared must be at least + sixteen. + + + + + + + + + + + + + Exchange names starting with "amq." are reserved for pre-declared and + standardised exchanges. The client MAY declare an exchange starting with + "amq." if the passive option is set, or the exchange already exists. + + + The client attempts to declare a non-existing exchange starting with + "amq." and with the passive option set to zero. + + + + + The exchange name consists of a non-empty sequence of these characters: + letters, digits, hyphen, underscore, period, or colon. + + + The client attempts to declare an exchange with an illegal name. + + + + + + + + Each exchange belongs to one of a set of exchange types implemented by the + server. The exchange types define the functionality of the exchange - i.e. how + messages are routed through it. It is not valid or meaningful to attempt to + change the type of an existing exchange. + + + + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + TODO. + + + + + The client MUST NOT attempt to declare an exchange with a type that the + server does not support. + + + TODO. + + + + + + + If set, the server will reply with Declare-Ok if the exchange already + exists with the same name, and raise an error if not. The client can + use this to check whether an exchange exists without modifying the + server state. When set, all other method fields except name and no-wait + are ignored. A declare with both passive and no-wait has no effect. + Arguments are compared for semantic equivalence. + + + + If set, and the exchange does not already exist, the server MUST + raise a channel exception with reply code 404 (not found). + + + TODO. + + + + + If not set and the exchange exists, the server MUST check that the + existing exchange has the same values for type, durable, and arguments + fields. The server MUST respond with Declare-Ok if the requested + exchange matches these fields, and MUST raise a channel exception if + not. + + + TODO. + + + + + + + If set when creating a new exchange, the exchange will be marked as durable. + Durable exchanges remain active when a server restarts. Non-durable exchanges + (transient exchanges) are purged if/when a server restarts. + + + + The server MUST support both durable and transient exchanges. + + + TODO. + + + + + + + If set, the exchange is deleted when all queues have + finished using it. + + + + The server SHOULD allow for a reasonable delay between the + point when it determines that an exchange is not being + used (or no longer used), and the point when it deletes + the exchange. At the least it must allow a client to + create an exchange and then bind a queue to it, with a + small but non-zero delay between these two actions. + + + + + The server MUST ignore the auto-delete field if the + exchange already exists. + + + + + + + If set, the exchange may not be used directly by publishers, + but only when bound to other exchanges. Internal exchanges + are used to construct wiring that is not visible to + applications. + + + + + + + + A set of arguments for the declaration. The syntax and semantics of these + arguments depends on the server implementation. + + + + + + + This method confirms a Declare method and confirms the name of the exchange, + essential for automatically-named exchanges. + + + + + + + + + This method deletes an exchange. When an exchange is deleted all queue bindings on + the exchange are cancelled. + + + + + + + + + + + + The client MUST NOT attempt to delete an exchange that does not exist. + + + + + + + + If set, the server will only delete the exchange if it has no queue bindings. If + the exchange has queue bindings the server does not delete it but raises a + channel exception instead. + + + + The server MUST NOT delete an exchange that has bindings on it, if the if-unused + field is true. + + + The client declares an exchange, binds a queue to it, then tries to delete it + setting if-unused to true. + + + + + + + + + This method confirms the deletion of an exchange. + + + + + + + + This method binds an exchange to an exchange. + + + + A server MUST allow and ignore duplicate bindings - that is, + two or more bind methods for a specific exchanges, with + identical arguments - without treating these as an error. + + + A client binds an exchange to an exchange. The client then + repeats the bind (with identical arguments). + + + + + + A server MUST allow cycles of exchange bindings to be + created including allowing an exchange to be bound to + itself. + + + A client declares an exchange and binds it to itself. + + + + + + A server MUST not deliver the same message more than once to + a destination exchange, even if the topology of exchanges + and bindings results in multiple (even infinite) routes to + that exchange. + + + A client declares an exchange and binds it using multiple + bindings to the amq.topic exchange. The client then + publishes a message to the amq.topic exchange that matches + all the bindings. + + + + + + + + + + + + Specifies the name of the destination exchange to bind. + + + A client MUST NOT be allowed to bind a non-existent + destination exchange. + + + A client attempts to bind an undeclared exchange to an + exchange. + + + + + The server MUST accept a blank exchange name to mean the + default exchange. + + + The client declares an exchange and binds a blank exchange + name to it. + + + + + + Specifies the name of the source exchange to bind. + + + A client MUST NOT be allowed to bind a non-existent source + exchange. + + + A client attempts to bind an exchange to an undeclared + exchange. + + + + + The server MUST accept a blank exchange name to mean the + default exchange. + + + The client declares an exchange and binds it to a blank + exchange name. + + + + + + + Specifies the routing key for the binding. The routing key + is used for routing messages depending on the exchange + configuration. Not all exchanges use a routing key - refer + to the specific exchange documentation. + + + + + + + + A set of arguments for the binding. The syntax and semantics + of these arguments depends on the exchange class. + + + + + + This method confirms that the bind was successful. + + + + + + + + This method unbinds an exchange from an exchange. + + If a unbind fails, the server MUST raise a connection exception. + + + + + + + + + Specifies the name of the destination exchange to unbind. + + + The client MUST NOT attempt to unbind an exchange that + does not exist from an exchange. + + + The client attempts to unbind a non-existent exchange from + an exchange. + + + + + The server MUST accept a blank exchange name to mean the + default exchange. + + + The client declares an exchange, binds a blank exchange + name to it, and then unbinds a blank exchange name from + it. + + + + + + Specifies the name of the source exchange to unbind. + + + The client MUST NOT attempt to unbind an exchange from an + exchange that does not exist. + + + The client attempts to unbind an exchange from a + non-existent exchange. + + + + + The server MUST accept a blank exchange name to mean the + default exchange. + + + The client declares an exchange, binds an exchange to a + blank exchange name, and then unbinds an exchange from a + black exchange name. + + + + + + Specifies the routing key of the binding to unbind. + + + + + + Specifies the arguments of the binding to unbind. + + + + + This method confirms that the unbind was successful. + + + + + + + + + + Queues store and forward messages. Queues can be configured in the server or created at + runtime. Queues must be attached to at least one exchange in order to receive messages + from publishers. + + + + queue = C:DECLARE S:DECLARE-OK + / C:BIND S:BIND-OK + / C:UNBIND S:UNBIND-OK + / C:PURGE S:PURGE-OK + / C:DELETE S:DELETE-OK + + + + + + + + + + This method creates or checks a queue. When creating a new queue the client can + specify various properties that control the durability of the queue and its + contents, and the level of sharing for the queue. + + + + + The server MUST create a default binding for a newly-declared queue to the + default exchange, which is an exchange of type 'direct' and use the queue + name as the routing key. + + + Client declares a new queue, and then without explicitly binding it to an + exchange, attempts to send a message through the default exchange binding, + i.e. publish a message to the empty exchange, with the queue name as routing + key. + + + + + + The server SHOULD support a minimum of 256 queues per virtual host and ideally, + impose no limit except as defined by available resources. + + + Client attempts to declare as many queues as it can until the server reports + an error. The resulting count must at least be 256. + + + + + + + + + + + + + The queue name MAY be empty, in which case the server MUST create a new + queue with a unique generated name and return this to the client in the + Declare-Ok method. + + + Client attempts to declare several queues with an empty name. The client then + verifies that the server-assigned names are unique and different. + + + + + Queue names starting with "amq." are reserved for pre-declared and + standardised queues. The client MAY declare a queue starting with + "amq." if the passive option is set, or the queue already exists. + + + The client attempts to declare a non-existing queue starting with + "amq." and with the passive option set to zero. + + + + + The queue name can be empty, or a sequence of these characters: + letters, digits, hyphen, underscore, period, or colon. + + + The client attempts to declare a queue with an illegal name. + + + + + + + If set, the server will reply with Declare-Ok if the queue already + exists with the same name, and raise an error if not. The client can + use this to check whether a queue exists without modifying the + server state. When set, all other method fields except name and no-wait + are ignored. A declare with both passive and no-wait has no effect. + Arguments are compared for semantic equivalence. + + + + The client MAY ask the server to assert that a queue exists without + creating the queue if not. If the queue does not exist, the server + treats this as a failure. + + + Client declares an existing queue with the passive option and expects + the server to respond with a declare-ok. Client then attempts to declare + a non-existent queue with the passive option, and the server must close + the channel with the correct reply-code. + + + + + If not set and the queue exists, the server MUST check that the + existing queue has the same values for durable, exclusive, auto-delete, + and arguments fields. The server MUST respond with Declare-Ok if the + requested queue matches these fields, and MUST raise a channel exception + if not. + + + TODO. + + + + + + + If set when creating a new queue, the queue will be marked as durable. Durable + queues remain active when a server restarts. Non-durable queues (transient + queues) are purged if/when a server restarts. Note that durable queues do not + necessarily hold persistent messages, although it does not make sense to send + persistent messages to a transient queue. + + + + The server MUST recreate the durable queue after a restart. + + + Client declares a durable queue. The server is then restarted. The client + then attempts to send a message to the queue. The message should be successfully + delivered. + + + + + The server MUST support both durable and transient queues. + + A client declares two named queues, one durable and one transient. + + + + + + + Exclusive queues may only be accessed by the current connection, and are + deleted when that connection closes. Passive declaration of an exclusive + queue by other connections are not allowed. + + + + + The server MUST support both exclusive (private) and non-exclusive (shared) + queues. + + + A client declares two named queues, one exclusive and one non-exclusive. + + + + + + The client MAY NOT attempt to use a queue that was declared as exclusive + by another still-open connection. + + + One client declares an exclusive queue. A second client on a different + connection attempts to declare, bind, consume, purge, delete, or declare + a queue of the same name. + + + + + + + If set, the queue is deleted when all consumers have finished using it. The last + consumer can be cancelled either explicitly or because its channel is closed. If + there was no consumer ever on the queue, it won't be deleted. Applications can + explicitly delete auto-delete queues using the Delete method as normal. + + + + + The server MUST ignore the auto-delete field if the queue already exists. + + + Client declares two named queues, one as auto-delete and one explicit-delete. + Client then attempts to declare the two queues using the same names again, + but reversing the value of the auto-delete field in each case. Verify that the + queues still exist with the original auto-delete flag values. + + + + + + + + + A set of arguments for the declaration. The syntax and semantics of these + arguments depends on the server implementation. + + + + + + + This method confirms a Declare method and confirms the name of the queue, essential + for automatically-named queues. + + + + + + + Reports the name of the queue. If the server generated a queue name, this field + contains that name. + + + + + + + + + Reports the number of active consumers for the queue. Note that consumers can + suspend activity (Channel.Flow) in which case they do not appear in this count. + + + + + + + + + This method binds a queue to an exchange. Until a queue is bound it will not + receive any messages. In a classic messaging model, store-and-forward queues + are bound to a direct exchange and subscription queues are bound to a topic + exchange. + + + + + A server MUST allow ignore duplicate bindings - that is, two or more bind + methods for a specific queue, with identical arguments - without treating these + as an error. + + + A client binds a named queue to an exchange. The client then repeats the bind + (with identical arguments). + + + + + + A server MUST not deliver the same message more than once to a queue, even if + the queue has multiple bindings that match the message. + + + A client declares a named queue and binds it using multiple bindings to the + amq.topic exchange. The client then publishes a message that matches all its + bindings. + + + + + + The server MUST allow a durable queue to bind to a transient exchange. + + + A client declares a transient exchange. The client then declares a named durable + queue and then attempts to bind the transient exchange to the durable queue. + + + + + + Bindings of durable queues to durable exchanges are automatically durable + and the server MUST restore such bindings after a server restart. + + + A server declares a named durable queue and binds it to a durable exchange. The + server is restarted. The client then attempts to use the queue/exchange combination. + + + + + + The server SHOULD support at least 4 bindings per queue, and ideally, impose no + limit except as defined by available resources. + + + A client declares a named queue and attempts to bind it to 4 different + exchanges. + + + + + + + + + + + + Specifies the name of the queue to bind. + + + The client MUST either specify a queue name or have previously declared a + queue on the same channel + + + The client opens a channel and attempts to bind an unnamed queue. + + + + + The client MUST NOT attempt to bind a queue that does not exist. + + + The client attempts to bind a non-existent queue. + + + + + + + + A client MUST NOT be allowed to bind a queue to a non-existent exchange. + + + A client attempts to bind an named queue to a undeclared exchange. + + + + + The server MUST accept a blank exchange name to mean the default exchange. + + + The client declares a queue and binds it to a blank exchange name. + + + + + + + Specifies the routing key for the binding. The routing key is used for routing + messages depending on the exchange configuration. Not all exchanges use a + routing key - refer to the specific exchange documentation. If the queue name + is empty, the server uses the last queue declared on the channel. If the + routing key is also empty, the server uses this queue name for the routing + key as well. If the queue name is provided but the routing key is empty, the + server does the binding with that empty routing key. The meaning of empty + routing keys depends on the exchange implementation. + + + + If a message queue binds to a direct exchange using routing key K and a + publisher sends the exchange a message with routing key R, then the message + MUST be passed to the message queue if K = R. + + + + + + + + + A set of arguments for the binding. The syntax and semantics of these arguments + depends on the exchange class. + + + + + + This method confirms that the bind was successful. + + + + + + + + This method unbinds a queue from an exchange. + + If a unbind fails, the server MUST raise a connection exception. + + + + + + + + + Specifies the name of the queue to unbind. + + + The client MUST either specify a queue name or have previously declared a + queue on the same channel + + + The client opens a channel and attempts to unbind an unnamed queue. + + + + + The client MUST NOT attempt to unbind a queue that does not exist. + + + The client attempts to unbind a non-existent queue. + + + + + + The name of the exchange to unbind from. + + + The client MUST NOT attempt to unbind a queue from an exchange that + does not exist. + + + The client attempts to unbind a queue from a non-existent exchange. + + + + + The server MUST accept a blank exchange name to mean the default exchange. + + + The client declares a queue and binds it to a blank exchange name. + + + + + + Specifies the routing key of the binding to unbind. + + + + Specifies the arguments of the binding to unbind. + + + + + This method confirms that the unbind was successful. + + + + + + + + This method removes all messages from a queue which are not awaiting + acknowledgment. + + + + + The server MUST NOT purge messages that have already been sent to a client + but not yet acknowledged. + + + + + + The server MAY implement a purge queue or log that allows system administrators + to recover accidentally-purged messages. The server SHOULD NOT keep purged + messages in the same storage spaces as the live messages since the volumes of + purged messages may get very large. + + + + + + + + + + + + Specifies the name of the queue to purge. + + + The client MUST either specify a queue name or have previously declared a + queue on the same channel + + + The client opens a channel and attempts to purge an unnamed queue. + + + + + The client MUST NOT attempt to purge a queue that does not exist. + + + The client attempts to purge a non-existent queue. + + + + + + + + + This method confirms the purge of a queue. + + + + + + Reports the number of messages purged. + + + + + + + + + This method deletes a queue. When a queue is deleted any pending messages are sent + to a dead-letter queue if this is defined in the server configuration, and all + consumers on the queue are cancelled. + + + + + The server SHOULD use a dead-letter queue to hold messages that were pending on + a deleted queue, and MAY provide facilities for a system administrator to move + these messages back to an active queue. + + + + + + + + + + + + Specifies the name of the queue to delete. + + + The client MUST either specify a queue name or have previously declared a + queue on the same channel + + + The client opens a channel and attempts to delete an unnamed queue. + + + + + The client MUST NOT attempt to delete a queue that does not exist. + + + The client attempts to delete a non-existent queue. + + + + + + + If set, the server will only delete the queue if it has no consumers. If the + queue has consumers the server does does not delete it but raises a channel + exception instead. + + + + The server MUST NOT delete a queue that has consumers on it, if the if-unused + field is true. + + + The client declares a queue, and consumes from it, then tries to delete it + setting if-unused to true. + + + + + + + If set, the server will only delete the queue if it has no messages. + + + + The server MUST NOT delete a queue that has messages on it, if the + if-empty field is true. + + + The client declares a queue, binds it and publishes some messages into it, + then tries to delete it setting if-empty to true. + + + + + + + + + This method confirms the deletion of a queue. + + + + + Reports the number of messages deleted. + + + + + + + + + The Basic class provides methods that support an industry-standard messaging model. + + + + basic = C:QOS S:QOS-OK + / C:CONSUME S:CONSUME-OK + / C:CANCEL S:CANCEL-OK + / C:PUBLISH content + / S:RETURN content + / S:DELIVER content + / C:GET ( S:GET-OK content / S:GET-EMPTY ) + / C:ACK + / S:ACK + / C:REJECT + / C:NACK + / S:NACK + / C:RECOVER-ASYNC + / C:RECOVER S:RECOVER-OK + + + + + + + + The server SHOULD respect the persistent property of basic messages and + SHOULD make a best-effort to hold persistent basic messages on a reliable + storage mechanism. + + + Send a persistent message to queue, stop server, restart server and then + verify whether message is still present. Assumes that queues are durable. + Persistence without durable queues makes no sense. + + + + + + The server MUST NOT discard a persistent basic message in case of a queue + overflow. + + + Declare a queue overflow situation with persistent messages and verify that + messages do not get lost (presumably the server will write them to disk). + + + + + + The server MAY use the Channel.Flow method to slow or stop a basic message + publisher when necessary. + + + Declare a queue overflow situation with non-persistent messages and verify + whether the server responds with Channel.Flow or not. Repeat with persistent + messages. + + + + + + The server MAY overflow non-persistent basic messages to persistent + storage. + + + + + + + The server MAY discard or dead-letter non-persistent basic messages on a + priority basis if the queue size exceeds some configured limit. + + + + + + + The server MUST implement at least 2 priority levels for basic messages, + where priorities 0-4 and 5-9 are treated as two distinct levels. + + + Send a number of priority 0 messages to a queue. Send one priority 9 + message. Consume messages from the queue and verify that the first message + received was priority 9. + + + + + + The server MAY implement up to 10 priority levels. + + + Send a number of messages with mixed priorities to a queue, so that all + priority values from 0 to 9 are exercised. A good scenario would be ten + messages in low-to-high priority. Consume from queue and verify how many + priority levels emerge. + + + + + + The server MUST deliver messages of the same priority in order irrespective of + their individual persistence. + + + Send a set of messages with the same priority but different persistence + settings to a queue. Consume and verify that messages arrive in same order + as originally published. + + + + + + The server MUST support un-acknowledged delivery of Basic content, i.e. + consumers with the no-ack field set to TRUE. + + + + + + The server MUST support explicitly acknowledged delivery of Basic content, + i.e. consumers with the no-ack field set to FALSE. + + + Declare a queue and a consumer using explicit acknowledgements. Publish a + set of messages to the queue. Consume the messages but acknowledge only + half of them. Disconnect and reconnect, and consume from the queue. + Verify that the remaining messages are received. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This method requests a specific quality of service. The QoS can be specified for the + current channel or for all channels on the connection. The particular properties and + semantics of a qos method always depend on the content class semantics. Though the + qos method could in principle apply to both peers, it is currently meaningful only + for the server. + + + + + + + + The client can request that messages be sent in advance so that when the client + finishes processing a message, the following message is already held locally, + rather than needing to be sent down the channel. Prefetching gives a performance + improvement. This field specifies the prefetch window size in octets. The server + will send a message in advance if it is equal to or smaller in size than the + available prefetch size (and also falls into other prefetch limits). May be set + to zero, meaning "no specific limit", although other prefetch limits may still + apply. The prefetch-size is ignored if the no-ack option is set. + + + + The server MUST ignore this setting when the client is not processing any + messages - i.e. the prefetch size does not limit the transfer of single + messages to a client, only the sending in advance of more messages while + the client still has one or more unacknowledged messages. + + + Define a QoS prefetch-size limit and send a single message that exceeds + that limit. Verify that the message arrives correctly. + + + + + + + Specifies a prefetch window in terms of whole messages. This field may be used + in combination with the prefetch-size field; a message will only be sent in + advance if both prefetch windows (and those at the channel and connection level) + allow it. The prefetch-count is ignored if the no-ack option is set. + + + + The server may send less data in advance than allowed by the client's + specified prefetch windows but it MUST NOT send more. + + + Define a QoS prefetch-size limit and a prefetch-count limit greater than + one. Send multiple messages that exceed the prefetch size. Verify that + no more than one message arrives at once. + + + + + + + By default the QoS settings apply to the current channel only. If this field is + set, they are applied to the entire connection. + + + + + + + This method tells the client that the requested QoS levels could be handled by the + server. The requested QoS applies to all active consumers until a new QoS is + defined. + + + + + + + + + This method asks the server to start a "consumer", which is a transient request for + messages from a specific queue. Consumers last as long as the channel they were + declared on, or until the client cancels them. + + + + + The server SHOULD support at least 16 consumers per queue, and ideally, impose + no limit except as defined by available resources. + + + Declare a queue and create consumers on that queue until the server closes the + connection. Verify that the number of consumers created was at least sixteen + and report the total number. + + + + + + + + + + + Specifies the name of the queue to consume from. + + + + + Specifies the identifier for the consumer. The consumer tag is local to a + channel, so two clients can use the same consumer tags. If this field is + empty the server will generate a unique tag. + + + + The client MUST NOT specify a tag that refers to an existing consumer. + + + Attempt to create two consumers with the same non-empty tag, on the + same channel. + + + + + The consumer tag is valid only within the channel from which the + consumer was created. I.e. a client MUST NOT create a consumer in one + channel and then use it in another. + + + Attempt to create a consumer in one channel, then use in another channel, + in which consumers have also been created (to test that the server uses + unique consumer tags). + + + + + + + + + + + Request exclusive consumer access, meaning only this consumer can access the + queue. + + + + + The client MAY NOT gain exclusive access to a queue that already has + active consumers. + + + Open two connections to a server, and in one connection declare a shared + (non-exclusive) queue and then consume from the queue. In the second + connection attempt to consume from the same queue using the exclusive + option. + + + + + + + + + A set of arguments for the consume. The syntax and semantics of these + arguments depends on the server implementation. + + + + + + + The server provides the client with a consumer tag, which is used by the client + for methods called on the consumer at a later stage. + + + + + Holds the consumer tag specified by the client or provided by the server. + + + + + + + + + This method cancels a consumer. This does not affect already delivered + messages, but it does mean the server will not send any more messages for + that consumer. The client may receive an arbitrary number of messages in + between sending the cancel method and receiving the cancel-ok reply. + + It may also be sent from the server to the client in the event + of the consumer being unexpectedly cancelled (i.e. cancelled + for any reason other than the server receiving the + corresponding basic.cancel from the client). This allows + clients to be notified of the loss of consumers due to events + such as queue deletion. Note that as it is not a MUST for + clients to accept this method from the client, it is advisable + for the broker to be able to identify those clients that are + capable of accepting the method, through some means of + capability negotiation. + + + + + If the queue does not exist the server MUST ignore the cancel method, so + long as the consumer tag is valid for that channel. + + + TODO. + + + + + + + + + + + + + + This method confirms that the cancellation was completed. + + + + + + + + + + + This method publishes a message to a specific exchange. The message will be routed + to queues as defined by the exchange configuration and distributed to any active + consumers when the transaction, if any, is committed. + + + + + + + + + + Specifies the name of the exchange to publish to. The exchange name can be + empty, meaning the default exchange. If the exchange name is specified, and that + exchange does not exist, the server will raise a channel exception. + + + + + The client MUST NOT attempt to publish a content to an exchange that + does not exist. + + + The client attempts to publish a content to a non-existent exchange. + + + + + The server MUST accept a blank exchange name to mean the default exchange. + + + The client declares a queue and binds it to a blank exchange name. + + + + + If the exchange was declared as an internal exchange, the server MUST raise + a channel exception with a reply code 403 (access refused). + + + TODO. + + + + + + The exchange MAY refuse basic content in which case it MUST raise a channel + exception with reply code 540 (not implemented). + + + TODO. + + + + + + + Specifies the routing key for the message. The routing key is used for routing + messages depending on the exchange configuration. + + + + + + This flag tells the server how to react if the message cannot be routed to a + queue. If this flag is set, the server will return an unroutable message with a + Return method. If this flag is zero, the server silently drops the message. + + + + + The server SHOULD implement the mandatory flag. + + + TODO. + + + + + + + This flag tells the server how to react if the message cannot be routed to a + queue consumer immediately. If this flag is set, the server will return an + undeliverable message with a Return method. If this flag is zero, the server + will queue the message, but with no guarantee that it will ever be consumed. + + + + + The server SHOULD implement the immediate flag. + + + TODO. + + + + + + + + This method returns an undeliverable message that was published with the "immediate" + flag set, or an unroutable message published with the "mandatory" flag set. The + reply code and text provide information about the reason that the message was + undeliverable. + + + + + + + + + + Specifies the name of the exchange that the message was originally published + to. May be empty, meaning the default exchange. + + + + + + Specifies the routing key name specified when the message was published. + + + + + + + + + This method delivers a message to the client, via a consumer. In the asynchronous + message delivery model, the client starts a consumer using the Consume method, then + the server responds with Deliver methods as and when messages arrive for that + consumer. + + + + + The server SHOULD track the number of times a message has been delivered to + clients and when a message is redelivered a certain number of times - e.g. 5 + times - without being acknowledged, the server SHOULD consider the message to be + unprocessable (possibly causing client applications to abort), and move the + message to a dead letter queue. + + + TODO. + + + + + + + + + + + + Specifies the name of the exchange that the message was originally published to. + May be empty, indicating the default exchange. + + + + + Specifies the routing key name specified when the message was published. + + + + + + + + This method provides a direct access to the messages in a queue using a synchronous + dialogue that is designed for specific types of application where synchronous + functionality is more important than performance. + + + + + + + + + + + Specifies the name of the queue to get a message from. + + + + + + + This method delivers a message to the client following a get method. A message + delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the + get method. + + + + + + + + + Specifies the name of the exchange that the message was originally published to. + If empty, the message was published to the default exchange. + + + + + Specifies the routing key name specified when the message was published. + + + + + + + + This method tells the client that the queue has no messages available for the + client. + + + + + + + + + + + When sent by the client, this method acknowledges one or more + messages delivered via the Deliver or Get-Ok methods. + + When sent by server, this method acknowledges one or more + messages published with the Publish method on a channel in + confirm mode. + + The acknowledgement can be for a single message or a set of + messages up to and including a specific message. + + + + + + + + + If set to 1, the delivery tag is treated as "up to and + including", so that multiple messages can be acknowledged + with a single method. If set to zero, the delivery tag + refers to a single message. If the multiple field is 1, and + the delivery tag is zero, this indicates acknowledgement of + all outstanding messages. + + + + A message MUST not be acknowledged more than once. The + receiving peer MUST validate that a non-zero delivery-tag + refers to a delivered message, and raise a channel + exception if this is not the case. On a transacted + channel, this check MUST be done immediately and not + delayed until a Tx.Commit. + + + TODO. + + + + + + + + + + This method allows a client to reject a message. It can be used to interrupt and + cancel large incoming messages, or return untreatable messages to their original + queue. + + + + + The server SHOULD be capable of accepting and process the Reject method while + sending message content with a Deliver or Get-Ok method. I.e. the server should + read and process incoming methods while sending output frames. To cancel a + partially-send content, the server sends a content body frame of size 1 (i.e. + with no data except the frame-end octet). + + + + + + The server SHOULD interpret this method as meaning that the client is unable to + process the message at this time. + + + TODO. + + + + + + The client MUST NOT use this method as a means of selecting messages to process. + + + TODO. + + + + + + + + + + If requeue is true, the server will attempt to requeue the message. If requeue + is false or the requeue attempt fails the messages are discarded or dead-lettered. + + + + + The server MUST NOT deliver the message to the same client within the + context of the current channel. The recommended strategy is to attempt to + deliver the message to an alternative consumer, and if that is not possible, + to move the message to a dead-letter queue. The server MAY use more + sophisticated tracking to hold the message on the queue and redeliver it to + the same client at a later stage. + + + TODO. + + + + + + + + + + This method asks the server to redeliver all unacknowledged messages on a + specified channel. Zero or more messages may be redelivered. This method + is deprecated in favour of the synchronous Recover/Recover-Ok. + + + + The server MUST set the redelivered flag on all messages that are resent. + + + TODO. + + + + + + If this field is zero, the message will be redelivered to the original + recipient. If this bit is 1, the server will attempt to requeue the message, + potentially then delivering it to an alternative subscriber. + + + + + + + + + This method asks the server to redeliver all unacknowledged messages on a + specified channel. Zero or more messages may be redelivered. This method + replaces the asynchronous Recover. + + + + The server MUST set the redelivered flag on all messages that are resent. + + + TODO. + + + + + + If this field is zero, the message will be redelivered to the original + recipient. If this bit is 1, the server will attempt to requeue the message, + potentially then delivering it to an alternative subscriber. + + + + + + + This method acknowledges a Basic.Recover method. + + + + + + + This method allows a client to reject one or more incoming messages. It can be + used to interrupt and cancel large incoming messages, or return untreatable + messages to their original queue. + + This method is also used by the server to inform publishers on channels in + confirm mode of unhandled messages. If a publisher receives this method, it + probably needs to republish the offending messages. + + + + + The server SHOULD be capable of accepting and processing the Nack method while + sending message content with a Deliver or Get-Ok method. I.e. the server should + read and process incoming methods while sending output frames. To cancel a + partially-send content, the server sends a content body frame of size 1 (i.e. + with no data except the frame-end octet). + + + + + + The server SHOULD interpret this method as meaning that the client is unable to + process the message at this time. + + + TODO. + + + + + + The client MUST NOT use this method as a means of selecting messages to process. + + + TODO. + + + + + + A client publishing messages to a channel in confirm mode SHOULD be capable of accepting + and somehow handling the Nack method. + + + TODO + + + + + + + + + + + If set to 1, the delivery tag is treated as "up to and + including", so that multiple messages can be rejected + with a single method. If set to zero, the delivery tag + refers to a single message. If the multiple field is 1, and + the delivery tag is zero, this indicates rejection of + all outstanding messages. + + + + A message MUST not be rejected more than once. The + receiving peer MUST validate that a non-zero delivery-tag + refers to an unacknowledged, delivered message, and + raise a channel exception if this is not the case. + + + TODO. + + + + + + + If requeue is true, the server will attempt to requeue the message. If requeue + is false or the requeue attempt fails the messages are discarded or dead-lettered. + Clients receiving the Nack methods should ignore this flag. + + + + + The server MUST NOT deliver the message to the same client within the + context of the current channel. The recommended strategy is to attempt to + deliver the message to an alternative consumer, and if that is not possible, + to move the message to a dead-letter queue. The server MAY use more + sophisticated tracking to hold the message on the queue and redeliver it to + the same client at a later stage. + + + TODO. + + + + + + + + + + + + The Tx class allows publish and ack operations to be batched into atomic + units of work. The intention is that all publish and ack requests issued + within a transaction will complete successfully or none of them will. + Servers SHOULD implement atomic transactions at least where all publish + or ack requests affect a single queue. Transactions that cover multiple + queues may be non-atomic, given that queues can be created and destroyed + asynchronously, and such events do not form part of any transaction. + Further, the behaviour of transactions with respect to the immediate and + mandatory flags on Basic.Publish methods is not defined. + + + + + Applications MUST NOT rely on the atomicity of transactions that + affect more than one queue. + + + + + Applications MUST NOT rely on the behaviour of transactions that + include messages published with the immediate option. + + + + + Applications MUST NOT rely on the behaviour of transactions that + include messages published with the mandatory option. + + + + + tx = C:SELECT S:SELECT-OK + / C:COMMIT S:COMMIT-OK + / C:ROLLBACK S:ROLLBACK-OK + + + + + + + + + + This method sets the channel to use standard transactions. The client must use this + method at least once on a channel before using the Commit or Rollback methods. + + + + + + + + This method confirms to the client that the channel was successfully set to use + standard transactions. + + + + + + + + + This method commits all message publications and acknowledgments performed in + the current transaction. A new transaction starts immediately after a commit. + + + + + + + The client MUST NOT use the Commit method on non-transacted channels. + + + The client opens a channel and then uses Tx.Commit. + + + + + + + This method confirms to the client that the commit succeeded. Note that if a commit + fails, the server raises a channel exception. + + + + + + + + + This method abandons all message publications and acknowledgments performed in + the current transaction. A new transaction starts immediately after a rollback. + Note that unacked messages will not be automatically redelivered by rollback; + if that is required an explicit recover call should be issued. + + + + + + + The client MUST NOT use the Rollback method on non-transacted channels. + + + The client opens a channel and then uses Tx.Rollback. + + + + + + + This method confirms to the client that the rollback succeeded. Note that if an + rollback fails, the server raises a channel exception. + + + + + + + + + + The Confirm class allows publishers to put the channel in + confirm mode and susequently be notified when messages have been + handled by the broker. The intention is that all messages + published on a channel in confirm mode will be acknowledged at + some point. By acknowledging a message the broker assumes + responsibility for it and indicates that it has done something + it deems reasonable with it. + + Unroutable mandatory or immediate messages are acknowledged + right after the Basic.Return method. Messages are acknowledged + when all queues to which the message has been routed + have either delivered the message and received an + acknowledgement (if required), or enqueued the message (and + persisted it if required). + + Published messages are assigned ascending sequence numbers, + starting at 1 with the first Confirm.Select method. The server + confirms messages by sending Basic.Ack methods referring to these + sequence numbers. + + + + + The server MUST acknowledge all messages received after the + channel was put into confirm mode. + + + + + + The server MUST acknowledge a message only after it was + properly handled by all the queues it was delivered to. + + + + + + The server MUST acknowledge an unroutable mandatory or + immediate message only after it sends the Basic.Return. + + + + + + No guarantees are made as to how soon a message is + acknowledged. Applications SHOULD NOT make assumptions about + this. + + + + + confirm = C:SELECT S:SELECT-OK + + + + + + + + + select confirm mode (i.e. enable publisher acknowledgements) + + This method sets the channel to use publisher acknowledgements. + The client can only use this method on a non-transactional + channel. + + + + + do not send a reply method + + If set, the server will not respond to the method. The client should + not wait for a reply method. If the server could not complete the + method it will raise a channel or connection exception. + + + + + + acknowledge confirm mode + + This method confirms to the client that the channel was successfully + set to use publisher acknowledgements. + + + + + + diff --git a/xt/01_podspell.t b/xt/01_podspell.t deleted file mode 100644 index e09d03f..0000000 --- a/xt/01_podspell.t +++ /dev/null @@ -1,13 +0,0 @@ -use Test::More; -eval q{ use Test::Spelling }; -plan skip_all => "Test::Spelling is not installed." if $@; -add_stopwords(map { split /[\s\:\-]/ } ); -set_spell_cmd('aspell list'); -$ENV{LANG} = 'C'; -all_pod_files_spelling_ok('lib'); -__DATA__ -Masahito Ikuta -cooldaemon@gmail.com -AMQP -RabbitMQ -multi diff --git a/xt/02_perlcritic.t b/xt/02_perlcritic.t deleted file mode 100644 index b977df8..0000000 --- a/xt/02_perlcritic.t +++ /dev/null @@ -1,8 +0,0 @@ -use strict; -use Test::More; -eval { - require Test::Perl::Critic; - Test::Perl::Critic->import( -profile => 'xt/perlcriticrc'); -}; -plan skip_all => "Test::Perl::Critic is not installed." if $@; -all_critic_ok('lib'); diff --git a/xt/03_pod.t b/xt/03_pod.t index 437887a..2c01bec 100644 --- a/xt/03_pod.t +++ b/xt/03_pod.t @@ -1,3 +1,6 @@ +# TODO should be replaced by Dist::Zilla::Plugin::PodSyntaxTests, but holding +# off for ease of development on Ubuntu 18.04. + use Test::More; eval "use Test::Pod 1.00"; plan skip_all => "Test::Pod 1.00 required for testing POD" if $@; diff --git a/xt/04_anyevent.t b/xt/04_anyevent.t index ca3de56..fc4be68 100644 --- a/xt/04_anyevent.t +++ b/xt/04_anyevent.t @@ -1,8 +1,9 @@ use Test::More; use Test::Exception; +use Data::Dumper; +use version 0.77; use FindBin; -use version; my %server = ( product => undef, @@ -15,6 +16,7 @@ my %conf = ( user => 'guest', pass => 'guest', vhost => '/', +# verbose => 1, ); eval { @@ -32,19 +34,48 @@ eval { plan skip_all => 'Connection failure: ' . $conf{host} . ':' . $conf{port} if $@; -plan tests => 31; use AnyEvent::RabbitMQ; -my $ar = AnyEvent::RabbitMQ->new(); +my $ar = AnyEvent::RabbitMQ->new(verbose => $conf{verbose}); lives_ok sub { $ar->load_xml_spec() }, 'load xml spec'; +my @nagle = ([], [nodelay => 0], [nodelay => 1]); + +for my $opt (@nagle) { + my $done = AnyEvent->condvar; + my $z = AnyEvent::RabbitMQ->new(verbose => $conf{verbose}); + $z->connect( + (map {$_ => $conf{$_}} qw(host port user pass vhost)), + timeout => 1, + on_success => sub { + my $ar = shift; + isa_ok($ar, 'AnyEvent::RabbitMQ'); + $done->send; + }, + on_failure => failure_cb($done), + on_return => sub { + my $method_frame = shift->method_frame; + die "return: ", $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; + }, + on_close => sub { + my $method_frame = shift->method_frame; + Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; + }, + @{ $opt }, + ); + $done->recv; +} + my $done = AnyEvent->condvar; $ar->connect( (map {$_ => $conf{$_}} qw(host port user pass vhost)), + tune => { frame_max => 2**17 }, timeout => 1, on_success => sub { my $ar = shift; @@ -54,34 +85,69 @@ $ar->connect( $done->send; }, on_failure => failure_cb($done), + on_return => sub { + my $method_frame = shift->method_frame; + die "return: ", $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; + }, on_close => sub { my $method_frame = shift->method_frame; - die $method_frame->reply_code, $method_frame->reply_text; + Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; }, ); $done->recv; -$done = AnyEvent->condvar; my $ch; -$ar->open_channel( +$done = AnyEvent->condvar; +open_ch($done); +$done->recv; + +sub open_ch { + my ($cv,) = @_; + $ar->open_channel( + on_success => sub { + $ch = shift; + isa_ok($ch, 'AnyEvent::RabbitMQ::Channel'); + $cv->send; + }, + on_failure => failure_cb($cv), + on_close => sub { + my $method_frame = shift->method_frame; + die $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; + }, + ); +} + +$done = AnyEvent->condvar; +$ch->declare_exchange( + exchange => 'test_x', on_success => sub { - $ch = shift; - isa_ok($ch, 'AnyEvent::RabbitMQ::Channel'); + pass('declare exchange'); $done->send; }, on_failure => failure_cb($done), - on_close => sub { - my $method_frame = shift->method_frame; - die $method_frame->reply_code, $method_frame->reply_text; - }, ); $done->recv; $done = AnyEvent->condvar; $ch->declare_exchange( - exchange => 'test_x', + exchange => 'test_x_dest', on_success => sub { - pass('declare exchange'); + pass('declare destination exchange'); + $done->send; + }, + on_failure => failure_cb($done), +); +$done->recv; + +$done = AnyEvent->condvar; +$ch->bind_exchange( + source => 'test_x', + destination => 'test_x_dest', + on_success => sub { + pass('bind exchange -> dest'); $done->send; }, on_failure => failure_cb($done), @@ -167,7 +233,7 @@ $ch->get( ); $done->recv; -for my $size (10, 131_064, 200_000, 10, 999_999, 10) { +for my $size (10, 131_064, 10, 140_000) { send_large_size_message($ch, $size); } @@ -257,7 +323,7 @@ $ch->cancel( on_failure => failure_cb($done), ); $done->recv; - + $done = AnyEvent->condvar; my $recover_count = 0; $ch->consume( @@ -292,7 +358,16 @@ pass('recover'); # This only works for RabbitMQ >= 2.0.0 my $can_reject = $server{product} eq 'RabbitMQ' && $server{version} >= version->parse('2.0.0'); SKIP: { - skip 'We need RabbitMQ >= 2.0.0 for the reject test', 1 unless $can_reject; + skip 'We need RabbitMQ >= 2.0.0 for the confirm and reject test', 1 unless $can_reject; + + $done = AnyEvent->condvar; + $ch->confirm( + on_success => sub { $done->send }, + on_failure => failure_cb($done), + ); + $done->recv; + pass('confirm'); + $done = AnyEvent->condvar; my $reject_count = 0; $ch->consume( @@ -323,9 +398,28 @@ SKIP: { }, on_failure => failure_cb($done), ); - publish($ch, 'RabbitMQ is powerful.', $done,); + my $pub_done = AnyEvent->condvar; + publish($ch, 'RabbitMQ is powerful.', $pub_done,); + $pub_done->recv; $done->recv; pass('reject'); + + # reopen because confirm is not compatible with transactions + $done = AnyEvent->condvar; + $ch->close( + on_success => sub { + pass('close2'); + $done->send; + }, + on_failure => failure_cb($done), + ); + $done->recv; + undef $ch; + + $done = AnyEvent->condvar; + open_ch($done); + $done->recv; + pass('open2'); }; $done = AnyEvent->condvar; @@ -389,6 +483,18 @@ $ch->delete_queue( ); $done->recv; +$done = AnyEvent->condvar; +$ch->unbind_exchange( + source => 'test_x', + destination => 'test_x_dest', + on_success => sub { + pass('unbind exchange'); + $done->send; + }, + on_failure => failure_cb($done), +); +$done->recv; + $done = AnyEvent->condvar; $ch->delete_exchange( exchange => 'test_x', @@ -400,10 +506,21 @@ $ch->delete_exchange( ); $done->recv; +$done = AnyEvent->condvar; +$ch->delete_exchange( + exchange => 'test_x_dest', + on_success => sub { + pass('delete destination exchange'); + $done->send; + }, + on_failure => failure_cb($done), +); +$done->recv; + $done = AnyEvent->condvar; $ar->close( on_success => sub { - pass('close'); + pass('close2'); $done->send; }, on_failure => failure_cb($done), @@ -425,6 +542,7 @@ sub publish { exchange => 'test_x', routing_key => 'test_r', body => $message, + on_ack => sub { $cv->send }, on_return => sub { my $response = shift; fail('on_return: ' . Dumper($response)); @@ -434,7 +552,7 @@ sub publish { return; } - + sub send_large_size_message { my ($ch, $size,) = @_; @@ -444,13 +562,13 @@ sub send_large_size_message { queue => 'test_q', on_success => sub { my $response = shift; - is(length($response->{body}->payload), $size, 'get large size'); + is(length($response->{body}->payload), $size, 'get large size: ' . $size); $done->send; }, on_failure => failure_cb($done), ); $done->recv; - return; } +done_testing; diff --git a/xt/05_multi_channel.t b/xt/05_multi_channel.t index bdcaaa6..d814013 100644 --- a/xt/05_multi_channel.t +++ b/xt/05_multi_channel.t @@ -24,7 +24,7 @@ eval { plan skip_all => 'Connection failure: ' . $conf{host} . ':' . $conf{port} if $@; -plan tests => 1; +plan tests => 3; use AnyEvent::RabbitMQ; @@ -36,14 +36,19 @@ my @queues = map { declare_queue($ch, $queue,); my $done = AnyEvent->condvar; + my $cdone = AnyEvent->condvar; consume($ch, $queue, sub { my $response = shift; return if 'stop' ne $response->{body}->payload; $done->send(); + }, sub { + $cdone->send(); }); - {name => $queue, cv => $done}; + {name => $queue, cv => $done, ccv => $cdone}; } (1..5); +pass('queue setup'); + my $ch = open_channel($ar); for my $queue (@queues) { publish($ch, $queue->{name}, 'hello'); @@ -62,6 +67,14 @@ for my $queue (@queues) { delete_queue($ch, $queue->{name}); } +my $ccount = 0; +for my $queue (@queues) { + $queue->{ccv}->recv; + $ccount++; +} + +is($ccount, 5, 'cancel count'); + close_ar($ar); sub connect_ar { @@ -70,7 +83,7 @@ sub connect_ar { (map {$_ => $conf{$_}} qw(host port user pass vhost)), timeout => 1, on_success => sub {$done->send(1)}, - on_failure => sub {$done->send()}, + on_failure => sub { diag @_; $done->send()}, on_close => \&handle_close, ); die 'Connection failure' if !$done->recv; @@ -83,7 +96,7 @@ sub close_ar { my $done = AnyEvent->condvar; $ar->close( on_success => sub {$done->send(1)}, - on_failure => sub {$done->send()}, + on_failure => sub { diag @_; $done->send()}, ); die 'Close failure' if !$done->recv; @@ -97,6 +110,7 @@ sub open_channel { $ar->open_channel( on_success => sub {$done->send(shift)}, on_failure => sub {$done->send()}, + on_return => sub {die 'Receive return'}, on_close => \&handle_close, ); my $ch = $done->recv; @@ -134,7 +148,7 @@ sub delete_queue { } sub consume { - my ($ch, $queue, $handle_consume,) = @_; + my ($ch, $queue, $handle_consume, $handle_cancel,) = @_; my $done = AnyEvent->condvar; $ch->consume( @@ -142,6 +156,7 @@ sub consume { on_success => sub {$done->send(1)}, on_failure => sub {$done->send()}, on_consume => $handle_consume, + on_cancel => $handle_cancel, ); die 'Consume failure' if !$done->recv; @@ -155,7 +170,6 @@ sub publish { routing_key => $queue, body => $message, mandatory => 1, - on_return => sub {die 'Receive return'}, ); return; @@ -163,6 +177,7 @@ sub publish { sub handle_close { my $method_frame = shift->method_frame; - die $method_frame->reply_code, $method_frame->reply_text; + die $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; } diff --git a/xt/06_close.t b/xt/06_close.t new file mode 100644 index 0000000..c78fe22 --- /dev/null +++ b/xt/06_close.t @@ -0,0 +1,109 @@ +use Test::More; +use Test::Exception; + +my %conf = ( + host => 'localhost', + port => 5672, + user => 'guest', + pass => 'guest', + vhost => '/', +); + +eval { + use IO::Socket::INET; + + my $socket = IO::Socket::INET->new( + Proto => 'tcp', + PeerAddr => $conf{host}, + PeerPort => $conf{port}, + Timeout => 1, + ) or die 'Error connecting to AMQP Server!'; + + close $socket; +}; + +plan skip_all => 'Connection failure: ' + . $conf{host} . ':' . $conf{port} if $@; +plan tests => 2; + +use AnyEvent::RabbitMQ; + +subtest 'No channels', sub { + my $ar = connect_ar(); + ok $ar->is_open, 'connection is open'; + is channel_count($ar), 0, 'no channels open'; + + close_ar($ar); + ok !$ar->is_open, 'connection closed'; + is channel_count($ar), 0, 'no channels open'; +}; + +subtest 'channels', sub { + my $ar = connect_ar(); + ok $ar->is_open, 'connection is open'; + is channel_count($ar), 0, 'no channels open'; + + my $ch = open_channel($ar); + ok $ch->is_open, 'channel is open'; + is channel_count($ar), 1, 'no channels open'; + + close_ar($ar); + ok !$ar->is_open, 'connection closed'; + is channel_count($ar), 0, 'no channels open'; + ok !$ch->is_open, 'channel closed'; +}; + +sub connect_ar { + my $done = AnyEvent->condvar; + my $ar = AnyEvent::RabbitMQ->new()->load_xml_spec()->connect( + (map {$_ => $conf{$_}} qw(host port user pass vhost)), + timeout => 1, + on_success => sub {$done->send(1)}, + on_failure => sub { diag @_; $done->send()}, + on_close => \&handle_close, + ); + die 'Connection failure' if !$done->recv; + return $ar; +} + +sub close_ar { + my ($ar,) = @_; + + my $done = AnyEvent->condvar; + $ar->close( + on_success => sub {$done->send(1)}, + on_failure => sub { diag @_; $done->send()}, + ); + die 'Close failure' if !$done->recv; + + return; +} + +sub channel_count { + my ($ar,) = @_; + + return scalar keys %{$ar->channels}; +} + +sub open_channel { + my ($ar,) = @_; + + my $done = AnyEvent->condvar; + $ar->open_channel( + on_success => sub {$done->send(shift)}, + on_failure => sub {$done->send()}, + on_return => sub {die 'Receive return'}, + on_close => \&handle_close, + ); + my $ch = $done->recv; + die 'Open channel failure' if !$ch; + + return $ch; +} + +sub handle_close { + my $method_frame = shift->method_frame; + die $method_frame->reply_code, $method_frame->reply_text + if $method_frame->reply_code; +} +