diff --git a/lib/net-http2/client.rb b/lib/net-http2/client.rb index 35a8896..57d4069 100644 --- a/lib/net-http2/client.rb +++ b/lib/net-http2/client.rb @@ -9,6 +9,7 @@ module NetHttp2 PROXY_SETTINGS_KEYS = [:proxy_addr, :proxy_port, :proxy_user, :proxy_pass] AsyncRequestTimeout = Class.new(StandardError) + SocketClosedError = Class.new(StandardError) class Client @@ -39,8 +40,7 @@ def call(method, path, options={}) def call_async(request) ensure_open - stream = new_monitored_stream_for request - stream.async_call_with request + new_stream.async_call_with request end def prepare_request(method, path, options={}) @@ -53,6 +53,7 @@ def ssl? def close exit_thread(@socket_thread) + @socket_error = SocketClosedError.new init_vars end @@ -74,10 +75,14 @@ def stream_count private - def init_vars + def init_vars(error: nil) @mutex.synchronize do @socket.close if @socket && !@socket.closed? + (@streams || {}).each do |k, v| + v.force_close(@socket_error) + end + @h2 = nil @socket = nil @socket_thread = nil @@ -86,20 +91,16 @@ def init_vars end end - def new_stream - @mutex.synchronize { NetHttp2::Stream.new(h2_stream: h2.new_stream) } - rescue StandardError => e - close - raise e - end - - def new_monitored_stream_for(request) - stream = new_stream + def new_stream() + stream = @mutex.synchronize { NetHttp2::Stream.new(h2_stream: h2.new_stream) } - @streams[stream.id] = true - request.on(:close) { @streams.delete(stream.id) } + @streams[stream.id] = stream + stream.on(:close) { @streams.delete(stream.id) } stream + rescue StandardError => e + close + raise e end def ensure_open @@ -115,13 +116,15 @@ def ensure_open rescue EOFError # socket closed + @socket_error = SocketError.new('Socket was remotely closed') init_vars - callback_or_raise SocketError.new('Socket was remotely closed') + callback_or_raise @socket_error rescue Exception => e # error on socket + @socket_error = e init_vars - callback_or_raise e + callback_or_raise @socket_error end end.tap { |t| t.abort_on_exception = true } end diff --git a/lib/net-http2/stream.rb b/lib/net-http2/stream.rb index c9264a9..37480d5 100644 --- a/lib/net-http2/stream.rb +++ b/lib/net-http2/stream.rb @@ -1,6 +1,7 @@ module NetHttp2 class Stream + include Callbacks def initialize(options={}) @h2_stream = options[:h2_stream] @@ -10,6 +11,7 @@ def initialize(options={}) @async = false @completed = false @mutex = Mutex.new + @error = nil @cv = ConditionVariable.new listen_for_headers @@ -42,12 +44,23 @@ def async? @async end + def wait + wait_for_completed + end + + def force_close(error = nil) + @error = error + @mutex.synchronize { @cv.signal } + end + private def listen_for_headers @h2_stream.on(:headers) do |hs_array| hs = Hash[*hs_array.flatten] + emit(:headers, hs) + if async? @request.emit(:headers, hs) else @@ -58,6 +71,9 @@ def listen_for_headers def listen_for_data @h2_stream.on(:data) do |data| + + emit(:data, data) + if async? @request.emit(:body_chunk, data) else @@ -70,6 +86,8 @@ def listen_for_close @h2_stream.on(:close) do |data| @completed = true + emit(:close, data) + if async? @request.emit(:close, data) else @@ -98,6 +116,7 @@ def sync_respond def wait_for_completed @mutex.synchronize { @cv.wait(@mutex, @request.timeout) } + raise @error if @error end end end diff --git a/spec/api/timeouts_with_sync_requests_spec.rb b/spec/api/timeouts_with_sync_requests_spec.rb index 2435fec..f384668 100644 --- a/spec/api/timeouts_with_sync_requests_spec.rb +++ b/spec/api/timeouts_with_sync_requests_spec.rb @@ -44,14 +44,15 @@ expect(responses.compact).to be_empty end - it "returns nil even if the client's main thread gets killed" do + it "returns raises an error if the client's main thread gets killed" do Thread.new do sleep 1 client.close end - response = client.call(:get, '/path', headers: { 'x-custom-header' => 'custom' }, timeout: 2) - expect(response).to be_nil + expect { + client.call(:get, '/path', headers: { 'x-custom-header' => 'custom' }, timeout: 2) + }.to raise_error(NetHttp2::SocketClosedError) end end