diff --git a/Changes b/Changes index 6ec4a3b..21bc473 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,35 @@ Revision history for Perl extension AnyEvent::RabbitMQ + - Defer finding the path of the included spec file in order to allow + programs to either provide a path for it or use another mechanism + to load the Net::AMQP protocol (like use Net::AMQP::Protocol::v0_8). + Otherwise, AnyEvent::RabbitMQ dies at compile time if the share dir + cannot be found, even if the protocol was already loaded. + + - Disable Nagle's algorithm in order to reduce latency. For RPC style + applications this algorithm artificially introduces ~50ms of latency + for full roundtrip, imposing a limit of only 25 calls/s. This change + improves RPC throughput noticeably (even tenfold for fast servers). + + - Do message publish using a single socket write (instead of three). + This improves throughput of small messages by 30%. + + - Fix that fact that the AMQP server can ask us to close the + connection - we should respond with CloseOk and then + not say anything further otherwise we get a SIGPIPE. + + - Fix a race condition stopping connections from closing properly. + If you ask to open a channel, and then immediately try to close + the connection then the not yet open channel would never remove + itself from the associated connection, resulting in the connection + never being terminated (as there were still channels associated with + it). + + - Stop leaking all RabbitMQ messages recieved back inside + a closure. + + - Fix a bug when closing a channel that is subscribed to multiple queues. + - Allow multiple clients to have independent connections to RabbitMQ, as long as they all use the same spec file. diff --git a/Makefile.PL b/Makefile.PL index 2ca2a19..323ab82 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -9,7 +9,6 @@ requires 'AnyEvent'; requires 'Devel::GlobalDestruction'; requires 'namespace::clean'; requires 'File::ShareDir'; -requires 'Readonly' => '1.03'; tests 't/*.t'; author_tests 'xt'; diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index b00e421..245f7dc 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -9,7 +9,6 @@ use List::MoreUtils qw(none); use Devel::GlobalDestruction; use namespace::clean; use File::ShareDir; -use Readonly; use AnyEvent::Handle; use AnyEvent::Socket; @@ -22,8 +21,7 @@ use AnyEvent::RabbitMQ::LocalQueue; our $VERSION = '1.05'; -Readonly my $DEFAULT_AMQP_SPEC - => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml'; +my $DEFAULT_AMQP_SPEC; sub new { my $class = shift; @@ -58,7 +56,10 @@ my $_loaded_spec; sub load_xml_spec { my $self = shift; my ($spec) = @_; - $spec ||= $DEFAULT_AMQP_SPEC; + unless ($spec) { + # Use the spec file included into this module package + $spec = $DEFAULT_AMQP_SPEC ||= File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml'; + } if ($_loaded_spec && $_loaded_spec ne $spec) { croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible"); } @@ -97,6 +98,9 @@ sub connect { sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!) ); + # Disable Nagle's algorithm in order to reduce latency + AnyEvent::Socket::tcp_nodelay( $fh, 1 ); + $self->{_handle} = AnyEvent::Handle->new( fh => $fh, on_error => sub { @@ -188,11 +192,10 @@ sub _check_close_and_clean { my ($frame, $close_cb,) = @_; return 1 if !$frame->isa('Net::AMQP::Frame::Method'); - my $method_frame = $frame->method_frame; - return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close'); - - $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()); + return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close') && !$method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk'); + $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()) + if $method_frame->isa('Net::AMQP::Protocol::Connection::Close'); $self->{_channels} = {}; $self->{_is_open} = 0; $self->_disconnect(); @@ -475,6 +478,31 @@ sub _push_write { return; } +sub _push_bulk_write { + my $self = shift; + my ($id, @frames) = @_; + + my $raw_data = ''; + + foreach my $output (@frames) { + if ($output->isa('Net::AMQP::Protocol::Base')) { + $output = $output->frame_wrap; + } + $output->channel($id || 0); + + if ($self->{verbose}) { + warn '[C] --> [S] ', Dumper($output); + } + + $raw_data .= $output->to_raw_frame(); + } + + $self->{_handle}->push_write($raw_data) + if $self->{_handle}; # Careful - could have gone (global destruction) + + return; +} + sub _set_cbs { my $self = shift; my %args = @_; diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm index d800768..4d0e802 100644 --- a/lib/AnyEvent/RabbitMQ/Channel.pm +++ b/lib/AnyEvent/RabbitMQ/Channel.pm @@ -42,7 +42,9 @@ sub open { $self->{_is_active} = 1; $args{on_success}->(); }, - $args{on_failure}, + sub { + $args{on_failure}->(@_); + }, $self->{id}, ); @@ -55,18 +57,30 @@ sub close { or return; my %args = $connection->_set_cbs(@_); - return $self if !$self->{_is_open}; + # Ensure to remove this channel from the connection even if we're not + # fully open to ensure $rf->close works always. + # FIXME - We can end up racing here so the server thinks the channel is + # open, but we've closed it - a more elegant fix would be to mark that + # the channel is opening, and wait for it to open before closing it + if (!$self->{_is_open}) { + $self->{connection}->delete_channel($self->{id}); + $args{on_success}->($self); + return $self; + } - return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}}; + my %cbs = %{ $self->{_consumer_cbs} }; + return $self->_close(%args) unless %cbs; - for my $consumer_tag (keys %{$self->{_consumer_cbs}}) { + for my $consumer_tag ( keys %cbs ) { $self->cancel( consumer_tag => $consumer_tag, on_success => sub { - $self->_close(%args); + delete $cbs{$consumer_tag}; + $self->_close(%args) unless %cbs; }, - on_failure => sub { - $self->_close(%args); + on_failure => sub { + delete $cbs{$consumer_tag}; + $self->_close(%args) unless %cbs; $args{on_failure}->(@_); } ); @@ -274,12 +288,11 @@ sub publish { my $body = delete $args{body} || ''; my $return_cb = delete $args{on_return} || sub {}; - $self->_publish( - %args, - )->_header( - $header_args, $body, - )->_body( - $body, + $self->{connection}->_push_bulk_write( + $self->{id}, + $self->_publish(%args), + $self->_header($header_args, $body), + $self->_body($body), ); return $self if !$args{mandatory} && !$args{immediate}; @@ -295,59 +308,50 @@ sub _publish { my $self = shift; my %args = @_; - $self->{connection}->_push_write( - Net::AMQP::Protocol::Basic::Publish->new( - exchange => '', - mandatory => 0, - immediate => 0, - %args, # routing_key - ticket => 0, - ), - $self->{id}, + my $frame = Net::AMQP::Protocol::Basic::Publish->new( + exchange => '', + mandatory => 0, + immediate => 0, + %args, # routing_key + ticket => 0, ); - return $self; + return $frame; } sub _header { my ($self, $args, $body,) = @_; - $self->{connection}->_push_write( - Net::AMQP::Frame::Header->new( - weight => $args->{weight} || 0, - body_size => length($body), - header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( - content_type => 'application/octet-stream', - content_encoding => undef, - headers => {}, - delivery_mode => 1, - priority => 1, - correlation_id => undef, - expiration => undef, - message_id => undef, - timestamp => time, - type => undef, - user_id => $self->{connection}->login_user, - app_id => undef, - cluster_id => undef, - %$args, - ), + my $frame = Net::AMQP::Frame::Header->new( + weight => $args->{weight} || 0, + body_size => length($body), + header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( + content_type => 'application/octet-stream', + content_encoding => undef, + headers => {}, + delivery_mode => 1, + priority => 1, + correlation_id => undef, + expiration => undef, + message_id => undef, + timestamp => time, + type => undef, + user_id => $self->{connection}->login_user, + app_id => undef, + cluster_id => undef, + %$args, ), - $self->{id}, ); - return $self; + return $frame; } sub _body { my ($self, $body,) = @_; - $self->{connection}->_push_write( - Net::AMQP::Frame::Body->new(payload => $body), - $self->{id}, - ); + my $frame = Net::AMQP::Frame::Body->new( payload => $body ); - return $self; + return $frame; } sub consume { @@ -654,6 +658,7 @@ sub _push_read_header_and_body { $self->{_content_queue}->get($next_frame); } else { + undef $next_frame; $frame->payload($body_payload); $response->{body} = $frame; $cb->($response);