From 40fa9641cea4101affbbcc68a052022154e30859 Mon Sep 17 00:00:00 2001 From: Clinton Gormley Date: Mon, 29 Aug 2011 15:35:16 +0200 Subject: [PATCH 1/8] Fix a bug when closing a channel that is subscribed to multiple queues --- lib/AnyEvent/RabbitMQ/Channel.pm | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm index d800768..c1876c8 100644 --- a/lib/AnyEvent/RabbitMQ/Channel.pm +++ b/lib/AnyEvent/RabbitMQ/Channel.pm @@ -57,16 +57,19 @@ sub close { return $self if !$self->{_is_open}; - return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}}; + my %cbs = %{ $self->{_consumer_cbs} }; + return $self->_close(%args) unless %cbs; - for my $consumer_tag (keys %{$self->{_consumer_cbs}}) { + for my $consumer_tag ( keys %cbs ) { $self->cancel( consumer_tag => $consumer_tag, on_success => sub { - $self->_close(%args); + delete $cbs{$consumer_tag}; + $self->_close(%args) unless %cbs; }, - on_failure => sub { - $self->_close(%args); + on_failure => sub { + delete $cbs{$consumer_tag}; + $self->_close(%args) unless %cbs; $args{on_failure}->(@_); } ); From 54f402a124bc93a6eab5bafefff0b2f057670f58 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Thu, 5 Jan 2012 02:54:20 +0000 Subject: [PATCH 2/8] Fix memory leaks --- Changes | 3 +++ lib/AnyEvent/RabbitMQ/Channel.pm | 1 + 2 files changed, 4 insertions(+) diff --git a/Changes b/Changes index 6ec4a3b..2449377 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,8 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Stop leaking all RabbitMQ messages recieved back inside + a closure. + - Allow multiple clients to have independent connections to RabbitMQ, as long as they all use the same spec file. diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm index d800768..2d20666 100644 --- a/lib/AnyEvent/RabbitMQ/Channel.pm +++ b/lib/AnyEvent/RabbitMQ/Channel.pm @@ -654,6 +654,7 @@ sub _push_read_header_and_body { $self->{_content_queue}->get($next_frame); } else { + undef $next_frame; $frame->payload($body_payload); $response->{body} = $frame; $cb->($response); From a14951f58a607e310ea48f3933a7f78fd9717bf0 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Thu, 5 Jan 2012 12:14:14 +0000 Subject: [PATCH 3/8] Fix the ->new_channel ->close race --- Changes | 7 +++++++ lib/AnyEvent/RabbitMQ/Channel.pm | 15 +++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/Changes b/Changes index 2449377..e4325f9 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,12 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Fix a race condition stopping connections from closing properly. + If you ask to open a channel, and then immediately try to close + the connection then the not yet open channel would never remove + itself from the associated connection, resulting in the connection + never being terminated (as there were still channels associated with + it). + - Stop leaking all RabbitMQ messages recieved back inside a closure. diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm index 2d20666..95419ab 100644 --- a/lib/AnyEvent/RabbitMQ/Channel.pm +++ b/lib/AnyEvent/RabbitMQ/Channel.pm @@ -42,7 +42,9 @@ sub open { $self->{_is_active} = 1; $args{on_success}->(); }, - $args{on_failure}, + sub { + $args{on_failure}->(@_); + }, $self->{id}, ); @@ -55,7 +57,16 @@ sub close { or return; my %args = $connection->_set_cbs(@_); - return $self if !$self->{_is_open}; + # Ensure to remove this channel from the connection even if we're not + # fully open to ensure $rf->close works always. + # FIXME - We can end up racing here so the server thinks the channel is + # open, but we've closed it - a more elegant fix would be to mark that + # the channel is opening, and wait for it to open before closing it + if (!$self->{_is_open}) { + $self->{connection}->delete_channel($self->{id}); + $args{on_success}->($self); + return $self; + } return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}}; From 8cc40abc22ae0fd1e1b8aec01a0f68f69f90ba61 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Thu, 16 Feb 2012 11:25:37 +0000 Subject: [PATCH 4/8] Fix S=>C Connection::Close --- Changes | 4 ++++ lib/AnyEvent/RabbitMQ.pm | 7 +++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Changes b/Changes index e4325f9..20649b9 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,9 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Fix that fact that the AMQP server can ask us to close the + connection - we should respond with CloseOk and then + not say anything further otherwise we get a SIGPIPE. + - Fix a race condition stopping connections from closing properly. If you ask to open a channel, and then immediately try to close the connection then the not yet open channel would never remove diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index b00e421..ed7d78a 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -188,11 +188,10 @@ sub _check_close_and_clean { my ($frame, $close_cb,) = @_; return 1 if !$frame->isa('Net::AMQP::Frame::Method'); - my $method_frame = $frame->method_frame; - return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close'); - - $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()); + return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close') && !$method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk'); + $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()) + if $method_frame->isa('Net::AMQP::Protocol::Connection::Close'); $self->{_channels} = {}; $self->{_is_open} = 0; $self->_disconnect(); From 05a714ef6f691670651889dc31e616ecf2e39f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Mic=C3=B3?= Date: Sat, 14 Apr 2012 00:27:27 -0300 Subject: [PATCH 5/8] Do message publish using a single socket write (instead of three). This improves throughput of small messages by 30%. --- Changes | 3 ++ lib/AnyEvent/RabbitMQ.pm | 25 +++++++++++ lib/AnyEvent/RabbitMQ/Channel.pm | 76 ++++++++++++++------------------ 3 files changed, 61 insertions(+), 43 deletions(-) diff --git a/Changes b/Changes index 20649b9..722d417 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,8 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Do message publish using a single socket write (instead of three). + This improves throughput of small messages by 30%. + - Fix that fact that the AMQP server can ask us to close the connection - we should respond with CloseOk and then not say anything further otherwise we get a SIGPIPE. diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index ed7d78a..f6367c7 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -474,6 +474,31 @@ sub _push_write { return; } +sub _push_bulk_write { + my $self = shift; + my ($id, @frames) = @_; + + my $raw_data = ''; + + foreach my $output (@frames) { + if ($output->isa('Net::AMQP::Protocol::Base')) { + $output = $output->frame_wrap; + } + $output->channel($id || 0); + + if ($self->{verbose}) { + warn '[C] --> [S] ', Dumper($output); + } + + $raw_data .= $output->to_raw_frame(); + } + + $self->{_handle}->push_write($raw_data) + if $self->{_handle}; # Careful - could have gone (global destruction) + + return; +} + sub _set_cbs { my $self = shift; my %args = @_; diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm index e4dc176..4d0e802 100644 --- a/lib/AnyEvent/RabbitMQ/Channel.pm +++ b/lib/AnyEvent/RabbitMQ/Channel.pm @@ -288,12 +288,11 @@ sub publish { my $body = delete $args{body} || ''; my $return_cb = delete $args{on_return} || sub {}; - $self->_publish( - %args, - )->_header( - $header_args, $body, - )->_body( - $body, + $self->{connection}->_push_bulk_write( + $self->{id}, + $self->_publish(%args), + $self->_header($header_args, $body), + $self->_body($body), ); return $self if !$args{mandatory} && !$args{immediate}; @@ -309,59 +308,50 @@ sub _publish { my $self = shift; my %args = @_; - $self->{connection}->_push_write( - Net::AMQP::Protocol::Basic::Publish->new( - exchange => '', - mandatory => 0, - immediate => 0, - %args, # routing_key - ticket => 0, - ), - $self->{id}, + my $frame = Net::AMQP::Protocol::Basic::Publish->new( + exchange => '', + mandatory => 0, + immediate => 0, + %args, # routing_key + ticket => 0, ); - return $self; + return $frame; } sub _header { my ($self, $args, $body,) = @_; - $self->{connection}->_push_write( - Net::AMQP::Frame::Header->new( - weight => $args->{weight} || 0, - body_size => length($body), - header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( - content_type => 'application/octet-stream', - content_encoding => undef, - headers => {}, - delivery_mode => 1, - priority => 1, - correlation_id => undef, - expiration => undef, - message_id => undef, - timestamp => time, - type => undef, - user_id => $self->{connection}->login_user, - app_id => undef, - cluster_id => undef, - %$args, - ), + my $frame = Net::AMQP::Frame::Header->new( + weight => $args->{weight} || 0, + body_size => length($body), + header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( + content_type => 'application/octet-stream', + content_encoding => undef, + headers => {}, + delivery_mode => 1, + priority => 1, + correlation_id => undef, + expiration => undef, + message_id => undef, + timestamp => time, + type => undef, + user_id => $self->{connection}->login_user, + app_id => undef, + cluster_id => undef, + %$args, ), - $self->{id}, ); - return $self; + return $frame; } sub _body { my ($self, $body,) = @_; - $self->{connection}->_push_write( - Net::AMQP::Frame::Body->new(payload => $body), - $self->{id}, - ); + my $frame = Net::AMQP::Frame::Body->new( payload => $body ); - return $self; + return $frame; } sub consume { From e888205306fb49167c0dc9aa4e4dc940cc922d25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Mic=C3=B3?= Date: Sat, 14 Apr 2012 01:01:49 -0300 Subject: [PATCH 6/8] Disable Nagle's algorithm in order to reduce latency. For RPC style applications this algorithm artificially introduces ~50ms of latency for full roundtrip, imposing a limit of only 25 calls/s. This change improves RPC throughput noticeably (even tenfold for fast servers). --- Changes | 5 +++++ lib/AnyEvent/RabbitMQ.pm | 3 +++ 2 files changed, 8 insertions(+) diff --git a/Changes b/Changes index 722d417..a569309 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,10 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Disable Nagle's algorithm in order to reduce latency. For RPC style + applications this algorithm artificially introduces ~50ms of latency + for full roundtrip, imposing a limit of only 25 calls/s. This change + improves RPC throughput noticeably (even tenfold for fast servers). + - Do message publish using a single socket write (instead of three). This improves throughput of small messages by 30%. diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index f6367c7..0aa3a10 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -97,6 +97,9 @@ sub connect { sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!) ); + # Disable Nagle's algorithm in order to reduce latency + AnyEvent::Socket::tcp_nodelay( $fh, 1 ); + $self->{_handle} = AnyEvent::Handle->new( fh => $fh, on_error => sub { From fc2dff4264efe8819aa8f15258827403ff65cb6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Mic=C3=B3?= Date: Sat, 14 Apr 2012 01:04:11 -0300 Subject: [PATCH 7/8] Added Clinton Gormley change description --- Changes | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Changes b/Changes index a569309..b002732 100644 --- a/Changes +++ b/Changes @@ -22,6 +22,8 @@ Revision history for Perl extension AnyEvent::RabbitMQ - Stop leaking all RabbitMQ messages recieved back inside a closure. + - Fix a bug when closing a channel that is subscribed to multiple queues. + - Allow multiple clients to have independent connections to RabbitMQ, as long as they all use the same spec file. From 01a26ec38a17bfa727c529469153621bd50fe564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Mic=C3=B3?= Date: Sat, 14 Apr 2012 01:44:54 -0300 Subject: [PATCH 8/8] Defer finding the path of the included spec file in order to allow programs to either provide a path for it or use another mechanism to load the Net::AMQP protocol (like use Net::AMQP::Protocol::v0_8). Otherwise, AnyEvent::RabbitMQ dies at compile time if the share dir cannot be found, even if the protocol was already loaded. --- Changes | 6 ++++++ Makefile.PL | 1 - lib/AnyEvent/RabbitMQ.pm | 9 +++++---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/Changes b/Changes index b002732..21bc473 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,11 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Defer finding the path of the included spec file in order to allow + programs to either provide a path for it or use another mechanism + to load the Net::AMQP protocol (like use Net::AMQP::Protocol::v0_8). + Otherwise, AnyEvent::RabbitMQ dies at compile time if the share dir + cannot be found, even if the protocol was already loaded. + - Disable Nagle's algorithm in order to reduce latency. For RPC style applications this algorithm artificially introduces ~50ms of latency for full roundtrip, imposing a limit of only 25 calls/s. This change diff --git a/Makefile.PL b/Makefile.PL index 2ca2a19..323ab82 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -9,7 +9,6 @@ requires 'AnyEvent'; requires 'Devel::GlobalDestruction'; requires 'namespace::clean'; requires 'File::ShareDir'; -requires 'Readonly' => '1.03'; tests 't/*.t'; author_tests 'xt'; diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index 0aa3a10..245f7dc 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -9,7 +9,6 @@ use List::MoreUtils qw(none); use Devel::GlobalDestruction; use namespace::clean; use File::ShareDir; -use Readonly; use AnyEvent::Handle; use AnyEvent::Socket; @@ -22,8 +21,7 @@ use AnyEvent::RabbitMQ::LocalQueue; our $VERSION = '1.05'; -Readonly my $DEFAULT_AMQP_SPEC - => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml'; +my $DEFAULT_AMQP_SPEC; sub new { my $class = shift; @@ -58,7 +56,10 @@ my $_loaded_spec; sub load_xml_spec { my $self = shift; my ($spec) = @_; - $spec ||= $DEFAULT_AMQP_SPEC; + unless ($spec) { + # Use the spec file included into this module package + $spec = $DEFAULT_AMQP_SPEC ||= File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml'; + } if ($_loaded_spec && $_loaded_spec ne $spec) { croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible"); }