-
Notifications
You must be signed in to change notification settings - Fork 27
Description
Basically we were loosing messages in our infrastructure, this is the result of my investigation.
I did a PoC producer writing one message every second, and one consumer consuming these messages, and during the process I was restarting the nsqd instance.
The result was pretty straightforward, here are the consumer logs:
Hello 1
[nsqworker] test-test - 0ae480263344d010 - done job 1 - 9.731e-06s
[nsqworker] test-test - 0ae480263344d011 - starting job 2
Hello 2
[nsqworker] test-test - 0ae480263344d011 - done job 2 - 2.4468e-05s
[nsqworker] test-test - 0ae480263344d012 - starting job 3
Hello 3
[nsqworker] test-test - 0ae480263344d012 - done job 3 - 9.773e-06s
W, [2019-01-16T19:52:35.477176 #17425] WARN -- : [host: 127.0.0.1 port: 4150] Died from: No data from socket
W, [2019-01-16T19:52:35.480081 #17425] WARN -- : [host: 127.0.0.1 port: 4150] Failed to connect: Connection reset by peer. Retrying in 0.5 seconds.
W, [2019-01-16T19:52:35.980996 #17425] WARN -- : [host: 127.0.0.1 port: 4150] Failed to connect: Connection reset by peer. Retrying in 0.6 seconds.
W, [2019-01-16T19:52:36.592932 #17425] WARN -- : [host: 127.0.0.1 port: 4150] Failed to connect: Connection refused - connect(2) for "127.0.0.1" port 4150. Retrying in 1.3 seconds.
[nsqworker] test-test - 0ae48055c884d000 - starting job 5
Hello 5
[nsqworker] test-test - 0ae48055c884d000 - done job 5 - 3.4755e-05s
Here are the logs of the producer:
===== PUB test-test
�{"type":"job","payload":1}
===== PUB test-test
�{"type":"job","payload":2}
===== PUB test-test
�{"type":"job","payload":3}
===== PUB test-test
�{"type":"job","payload":4}
E, [2019-01-16T19:52:36.134865 #18336] ERROR -- : [host: 127.0.0.1 port: 4150] Error received: E_PUB_FAILED PUB failed exiting
W, [2019-01-16T19:52:36.164201 #18336] WARN -- : [host: 127.0.0.1 port: 4150] Died from: No data from socket
W, [2019-01-16T19:52:36.170969 #18336] WARN -- : [host: 127.0.0.1 port: 4150] Failed to connect: Connection reset by peer. Retrying in 0.5 seconds.
W, [2019-01-16T19:52:36.671352 #18336] WARN -- : [host: 127.0.0.1 port: 4150] Failed to connect: Connection refused - connect(2) for "127.0.0.1" port 4150. Retrying in 0.7 seconds.
===== PUB test-test
�{"type":"job","payload":5}
We can see that the job 4 has never been consumed, and after a short investigation we got it that it has never been emitted. The reason is probably in connection.rb
def write_loop
data = nil
loop do
data = @write_queue.pop
break if data == :stop_write_loop
write_to_socket(data)
end
rescue Exception => ex
# requeue PUB and MPUB commands
if data =~ /^M?PUB/
debug "Requeueing to write_queue: #{data.inspect}"
@write_queue.push(data)
end
die(ex)
end
So it never gets to the requeue exception it fails at the next message with break if data == :stop_write_loop
I've done a small patch with a 'memory' and it works correctly, but I'm sure it's not the best solution (actually according to when it's failing, sometimes the message is handled and sometimes not, how can we check that?):
def write_loop
previous = nil
data = nil
loop do
data = @write_queue.pop
if data == :stop_write_loop
if previous && previous =~ /^M?PUB/
debug "Requeueing to write_queue: #{data.inspect}"
@write_queue.push(previous)
end
break
end
write_to_socket(data)
previous = data
end
Could I have your input on this, it really needs to be fixed, it's unacceptable to loose messages like this.
Thanks a lot (ping @bschwartz )