Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/nsq/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def initialize(opts = {})


def connected?
return @write_loop_thread.alive? && @connected if @write_loop_thread # for producers
@connected
end

Expand Down
46 changes: 41 additions & 5 deletions lib/nsq/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,24 @@ def initialize(opts = {})
@ssl_context = opts[:ssl_context]
@tls_options = opts[:tls_options]
@tls_v1 = opts[:tls_v1]
@mx = Mutex.new

nsqlookupds = []
@nsqlookupds = []
if opts[:nsqlookupd]
nsqlookupds = [opts[:nsqlookupd]].flatten
@nsqlookupds = [opts[:nsqlookupd]].flatten
discover_repeatedly(
nsqlookupds: nsqlookupds,
nsqlookupds: @nsqlookupds,
interval: @discovery_interval
)

elsif opts[:nsqd]
nsqds = [opts[:nsqd]].flatten
nsqds.each{|d| add_connection(d)}
@nsqds = [opts[:nsqd]].flatten
@nsqds.each{|d| add_connection(d)}

else
add_connection('127.0.0.1:4150')
end
update_fork_pid
end

def write(*raw_messages)
Expand Down Expand Up @@ -75,8 +77,35 @@ def deferred_write_to_topic(topic, delay, *raw_messages)
end
end

def reconnect_on_fork
if @nsqlookupds.any?
if @discovery_thread && !@discovery_thread.alive?
@mx.synchronize do
break if @discovery_thread.alive?
debug "Discovery thread is dead; terminating connections and restarting discovery loop"
terminate
discover_repeatedly(
nsqlookupds: @nsqlookupds,
interval: @discovery_interval
)
update_fork_pid
end
end
elsif forked?
@mx.synchronize do
break unless forked?
debug "Fork detected - recreating connections"
terminate
@nsqds.each{|d| add_connection(d)}
update_fork_pid
end
end
end

private
def connection_for_write
reconnect_on_fork

# Choose a random Connection that's currently connected
# Or, if there's nothing connected, just take any random one
connections_currently_connected = connections.select{|_,c| c.connected?}
Expand All @@ -90,5 +119,12 @@ def connection_for_write
connection
end

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
end
end