Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ehttpc changes

## 0.7.1

- Reply error to async callbacks when request expired.

## 0.7.0

- Switch to `gun` 2.1.0.
Expand Down
103 changes: 66 additions & 37 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
(is_binary(element(3, REQ)) orelse is_list(element(3, REQ))))
).

-define(EXPIRED(Req), {expired, Req}).

-record(state, {
pool :: term(),
id :: pos_integer(),
Expand Down Expand Up @@ -344,7 +346,8 @@ do_handle_info(
% e.g. after the stream has been closed by gun, if we send a cancel stream
% gun will reply with Reason={badstate,"The stream cannot be found."}
State;
{expired, NRequests} ->
{?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NRequests} ->
reply(ReplyTo, {error, expired}),
State#state{requests = NRequests};
{?SENT_REQ(ReplyTo, _, _), NRequests} ->
reply(ReplyTo, {error, Reason}),
Expand Down Expand Up @@ -418,7 +421,8 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) ->
case take_sent_req(StreamRef, Acc) of
error ->
Acc;
{expired, NAcc} ->
{?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NAcc} ->
reply(ReplyTo, {error, expired}),
NAcc;
{?SENT_REQ(ReplyTo, _, _), NAcc} ->
reply(ReplyTo, {error, Reason}),
Expand Down Expand Up @@ -620,7 +624,7 @@ take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) ->
end,
case is_sent_req_expired(Req, now_()) of
true ->
{expired, Requests#{sent := NewSent, max_sent_expire := NewT}};
{?EXPIRED(Req), Requests#{sent := NewSent, max_sent_expire := NewT}};
false ->
{Req, Requests#{sent := NewSent, max_sent_expire := NewT}}
end
Expand Down Expand Up @@ -944,47 +948,72 @@ handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) ->
%% Received 'gun_data' message from unknown stream
%% this may happen when the async cancel stream is sent too late
State;
{expired, NRequests} ->
%% the call is expired, caller is no longer waiting for a reply
{?EXPIRED(?SENT_REQ(_, _, _) = SentReq), NRequests} ->
ok = cancel_stream(IsFin, Client, StreamRef),
State#state{requests = NRequests};
{?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests} ->
%% gun_response, http head

%% assert, no body yet
?undef = Data,
case IsFin of
fin ->
%% only http heads no body
reply(ReplyTo, {ok, StatusCode, Headers}),
State#state{requests = NRequests};
nofin ->
%% start accumulating data
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end;
{?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests} ->
%% gun_data, http body
handle_known_gun_reply(
SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data
);
{?SENT_REQ(_, _, _) = SentReq, NRequests} ->
handle_known_gun_reply(
SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data
)
end.

%% assert
?undef = StatusCode,
%% assert
?undef = Headers,
case IsFin of
fin ->
reply(
ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])}
),
State#state{requests = NRequests};
nofin ->
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end
handle_known_gun_reply(
?SENT_REQ(ReplyTo, ExpireAt, ?undef),
NRequests,
State,
StreamRef,
IsFin,
StatusCode,
Headers,
Data
) ->
%% gun_response, http head
%% assert, no body yet
?undef = Data,
case IsFin of
fin ->
%% only http heads no body
reply(ReplyTo, {ok, StatusCode, Headers}),
State#state{requests = NRequests};
nofin ->
%% start accumulating data
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end;
handle_known_gun_reply(
?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}),
NRequests,
State,
StreamRef,
IsFin,
StatusCode,
Headers,
Data
) ->
%% gun_data, http body
%% assert
?undef = StatusCode,
%% assert
?undef = Headers,
case IsFin of
fin ->
reply(
ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])}
),
State#state{requests = NRequests};
nofin ->
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end.

reply({F, A}, Result) when is_function(F) ->
_ = erlang:apply(F, A ++ [Result]),
ok;
reply(_From, {error, expired}) ->
%% the caller should get {error, timeout} return from request/5
ok;
reply(From, Result) ->
gen_server:reply(From, Result).

Expand Down
Loading