Skip to content
30 changes: 30 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
Revision history for Perl extension AnyEvent::RabbitMQ

- Defer finding the path of the included spec file in order to allow
programs to either provide a path for it or use another mechanism
to load the Net::AMQP protocol (like use Net::AMQP::Protocol::v0_8).
Otherwise, AnyEvent::RabbitMQ dies at compile time if the share dir
cannot be found, even if the protocol was already loaded.

- Disable Nagle's algorithm in order to reduce latency. For RPC style
applications this algorithm artificially introduces ~50ms of latency
for full roundtrip, imposing a limit of only 25 calls/s. This change
improves RPC throughput noticeably (even tenfold for fast servers).

- Do message publish using a single socket write (instead of three).
This improves throughput of small messages by 30%.

- Fix that fact that the AMQP server can ask us to close the
connection - we should respond with CloseOk and then
not say anything further otherwise we get a SIGPIPE.

- Fix a race condition stopping connections from closing properly.
If you ask to open a channel, and then immediately try to close
the connection then the not yet open channel would never remove
itself from the associated connection, resulting in the connection
never being terminated (as there were still channels associated with
it).

- Stop leaking all RabbitMQ messages recieved back inside
a closure.

- Fix a bug when closing a channel that is subscribed to multiple queues.

- Allow multiple clients to have independent connections
to RabbitMQ, as long as they all use the same spec file.

Expand Down
1 change: 0 additions & 1 deletion Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ requires 'AnyEvent';
requires 'Devel::GlobalDestruction';
requires 'namespace::clean';
requires 'File::ShareDir';
requires 'Readonly' => '1.03';

tests 't/*.t';
author_tests 'xt';
Expand Down
44 changes: 36 additions & 8 deletions lib/AnyEvent/RabbitMQ.pm
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
use namespace::clean;
use File::ShareDir;
use Readonly;

use AnyEvent::Handle;
use AnyEvent::Socket;
Expand All @@ -22,8 +21,7 @@ use AnyEvent::RabbitMQ::LocalQueue;

our $VERSION = '1.05';

Readonly my $DEFAULT_AMQP_SPEC
=> File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml';
my $DEFAULT_AMQP_SPEC;

sub new {
my $class = shift;
Expand Down Expand Up @@ -58,7 +56,10 @@ my $_loaded_spec;
sub load_xml_spec {
my $self = shift;
my ($spec) = @_;
$spec ||= $DEFAULT_AMQP_SPEC;
unless ($spec) {
# Use the spec file included into this module package
$spec = $DEFAULT_AMQP_SPEC ||= File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml';
}
if ($_loaded_spec && $_loaded_spec ne $spec) {
croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
}
Expand Down Expand Up @@ -97,6 +98,9 @@ sub connect {
sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
);

# Disable Nagle's algorithm in order to reduce latency
AnyEvent::Socket::tcp_nodelay( $fh, 1 );

$self->{_handle} = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
Expand Down Expand Up @@ -188,11 +192,10 @@ sub _check_close_and_clean {
my ($frame, $close_cb,) = @_;

return 1 if !$frame->isa('Net::AMQP::Frame::Method');

my $method_frame = $frame->method_frame;
return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');

$self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close') && !$method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
$self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new())
if $method_frame->isa('Net::AMQP::Protocol::Connection::Close');
$self->{_channels} = {};
$self->{_is_open} = 0;
$self->_disconnect();
Expand Down Expand Up @@ -475,6 +478,31 @@ sub _push_write {
return;
}

sub _push_bulk_write {
my $self = shift;
my ($id, @frames) = @_;

my $raw_data = '';

foreach my $output (@frames) {
if ($output->isa('Net::AMQP::Protocol::Base')) {
$output = $output->frame_wrap;
}
$output->channel($id || 0);

if ($self->{verbose}) {
warn '[C] --> [S] ', Dumper($output);
}

$raw_data .= $output->to_raw_frame();
}

$self->{_handle}->push_write($raw_data)
if $self->{_handle}; # Careful - could have gone (global destruction)

return;
}

sub _set_cbs {
my $self = shift;
my %args = @_;
Expand Down
105 changes: 55 additions & 50 deletions lib/AnyEvent/RabbitMQ/Channel.pm
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ sub open {
$self->{_is_active} = 1;
$args{on_success}->();
},
$args{on_failure},
sub {
$args{on_failure}->(@_);
},
$self->{id},
);

Expand All @@ -55,18 +57,30 @@ sub close {
or return;
my %args = $connection->_set_cbs(@_);

return $self if !$self->{_is_open};
# Ensure to remove this channel from the connection even if we're not
# fully open to ensure $rf->close works always.
# FIXME - We can end up racing here so the server thinks the channel is
# open, but we've closed it - a more elegant fix would be to mark that
# the channel is opening, and wait for it to open before closing it
if (!$self->{_is_open}) {
$self->{connection}->delete_channel($self->{id});
$args{on_success}->($self);
return $self;
}

return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}};
my %cbs = %{ $self->{_consumer_cbs} };
return $self->_close(%args) unless %cbs;

for my $consumer_tag (keys %{$self->{_consumer_cbs}}) {
for my $consumer_tag ( keys %cbs ) {
$self->cancel(
consumer_tag => $consumer_tag,
on_success => sub {
$self->_close(%args);
delete $cbs{$consumer_tag};
$self->_close(%args) unless %cbs;
},
on_failure => sub {
$self->_close(%args);
on_failure => sub {
delete $cbs{$consumer_tag};
$self->_close(%args) unless %cbs;
$args{on_failure}->(@_);
}
);
Expand Down Expand Up @@ -274,12 +288,11 @@ sub publish {
my $body = delete $args{body} || '';
my $return_cb = delete $args{on_return} || sub {};

$self->_publish(
%args,
)->_header(
$header_args, $body,
)->_body(
$body,
$self->{connection}->_push_bulk_write(
$self->{id},
$self->_publish(%args),
$self->_header($header_args, $body),
$self->_body($body),
);

return $self if !$args{mandatory} && !$args{immediate};
Expand All @@ -295,59 +308,50 @@ sub _publish {
my $self = shift;
my %args = @_;

$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Publish->new(
exchange => '',
mandatory => 0,
immediate => 0,
%args, # routing_key
ticket => 0,
),
$self->{id},
my $frame = Net::AMQP::Protocol::Basic::Publish->new(
exchange => '',
mandatory => 0,
immediate => 0,
%args, # routing_key
ticket => 0,
);

return $self;
return $frame;
}

sub _header {
my ($self, $args, $body,) = @_;

$self->{connection}->_push_write(
Net::AMQP::Frame::Header->new(
weight => $args->{weight} || 0,
body_size => length($body),
header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
content_type => 'application/octet-stream',
content_encoding => undef,
headers => {},
delivery_mode => 1,
priority => 1,
correlation_id => undef,
expiration => undef,
message_id => undef,
timestamp => time,
type => undef,
user_id => $self->{connection}->login_user,
app_id => undef,
cluster_id => undef,
%$args,
),
my $frame = Net::AMQP::Frame::Header->new(
weight => $args->{weight} || 0,
body_size => length($body),
header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
content_type => 'application/octet-stream',
content_encoding => undef,
headers => {},
delivery_mode => 1,
priority => 1,
correlation_id => undef,
expiration => undef,
message_id => undef,
timestamp => time,
type => undef,
user_id => $self->{connection}->login_user,
app_id => undef,
cluster_id => undef,
%$args,
),
$self->{id},
);

return $self;
return $frame;
}

sub _body {
my ($self, $body,) = @_;

$self->{connection}->_push_write(
Net::AMQP::Frame::Body->new(payload => $body),
$self->{id},
);
my $frame = Net::AMQP::Frame::Body->new( payload => $body );

return $self;
return $frame;
}

sub consume {
Expand Down Expand Up @@ -654,6 +658,7 @@ sub _push_read_header_and_body {
$self->{_content_queue}->get($next_frame);
}
else {
undef $next_frame;
$frame->payload($body_payload);
$response->{body} = $frame;
$cb->($response);
Expand Down