Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# ehttpc changes

## 0.7.3

- Previously, we had a fixed restart intensity for the worker supervisor, meaning that if a
large pool dies once, it brings down the supervisor. Here, we have an intensity
proportional to the pool size, so the whole pool may restart at once without bringing the
supervisor down.
- Added an `ehttpc:check_pool_integrity/1` function to check whether the supervision tree
is whole (i.e., no children of the root supervisor are permanently dead and won't be
restarted).

## 0.7.2

- Fix rebar.config.
Expand Down
2 changes: 1 addition & 1 deletion src/ehttpc.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, ehttpc, [
{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.7.1"},
{vsn, "0.7.3"},
{registered, []},
{applications, [
kernel,
Expand Down
6 changes: 6 additions & 0 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
request_async/5,
workers/1,
health_check/2,
check_pool_integrity/1,
name/1
]).

Expand Down Expand Up @@ -235,6 +236,11 @@ mk_async_request(delete = Method, Req, ExpireAt, RC) when ?IS_HEADERS_REQ(Req) -
workers(Pool) ->
gproc_pool:active_workers(name(Pool)).

-spec check_pool_integrity(pool_name()) ->
ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}.
check_pool_integrity(Pool) ->
ehttpc_sup:check_pool_integrity(Pool).

name(Pool) -> {?MODULE, Pool}.

%%--------------------------------------------------------------------
Expand Down
49 changes: 48 additions & 1 deletion src/ehttpc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
%% API
-export([
start_pool/2,
stop_pool/1
stop_pool/1,
check_pool_integrity/1
]).

%% Supervisor callbacks
Expand All @@ -34,6 +35,16 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec check_pool_integrity(ehttpc:pool_name()) ->
ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}.
check_pool_integrity(Pool) ->
case get_child(child_id(Pool)) of
{ok, SupPid} ->
do_check_pool_integrity_root(SupPid);
{error, Reason} ->
{error, Reason}
end.

%%--------------------------------------------------------------------
%% Start/Stop a pool
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -72,3 +83,39 @@ pool_spec(Pool, Opts) ->
}.

child_id(Pool) -> {ehttpc_pool_sup, Pool}.

%%--------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------

get_child(Id) ->
Res = [Child || {Id0, Child, supervisor, _} <- supervisor:which_children(?MODULE), Id == Id0],
case Res of
[] ->
{error, not_found};
[undefined] ->
{error, dead};
[restarting] ->
{error, restarting};
[Pid] when is_pid(Pid) ->
{ok, Pid}
end.

do_check_pool_integrity_root(SupPid) ->
try supervisor:which_children(SupPid) of
Children ->
%% We ignore `restarting` here because those processes are still being
%% managed.
DeadChildren = [Id || {Id, undefined, _, _} <- Children],
%% Currently, at root, we only have one supervisor: `ehttpc_worker_sup`, and
%% it does not contain other supervisors under it, so no need to dig deeper.
case DeadChildren of
[_ | _] ->
{error, {processes_down, DeadChildren}};
[] ->
ok
end
catch
exit:{noproc, _} ->
{error, {processes_down, [root]}}
end.
11 changes: 9 additions & 2 deletions src/ehttpc_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ start_link(Pool, Opts) ->
supervisor:start_link(?MODULE, [Pool, Opts]).

init([Pool, Opts]) ->
PoolSize = pool_size(Opts),
SupFlags = #{
strategy => one_for_one,
%% Allow whole pool dying simultaneously at least once.
intensity => 10 + PoolSize,
period => 60
},
WorkerSpec = fun(Id) ->
#{
id => {Pool, Id},
Expand All @@ -36,8 +43,8 @@ init([Pool, Opts]) ->
modules => [ehttpc]
}
end,
Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))],
{ok, {{one_for_one, 10, 60}, Workers}}.
Workers = [WorkerSpec(I) || I <- lists:seq(1, PoolSize)],
{ok, {SupFlags, Workers}}.

pool_size(Opts) ->
Schedulers = erlang:system_info(schedulers),
Expand Down
70 changes: 70 additions & 0 deletions test/ehttpc_sup_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-module(ehttpc_sup_tests).

-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

shutdown_test_() ->
[{timeout, 10, fun t_shutdown/0}].
Expand Down Expand Up @@ -59,3 +60,72 @@ wait_for_down(Pids) ->
after 10_000 ->
error(timeout)
end.

%% Smoke tests for `ecpool:check_pool_integrity`, which should report an error when worker
%% supervisor is down.
check_pool_integrity_test() ->
Pool = atom_to_binary(?MODULE),
Opts = [
{host, "google.com"},
{port, "80"},
{enable_pipelining, 1},
{pool_size, 15},
{pool_type, random},
{connect_timeout, 5000},
{prioritise_latest, true}
],
application:ensure_all_started(ehttpc),
{ok, _} = ehttpc_sup:start_pool(Pool, Opts),
?assertEqual(ok, ehttpc:check_pool_integrity(Pool)),
ok = ehttpc_sup:stop_pool(Pool),
{ok, _} = ehttpc_sup:start_pool(Pool, Opts),
Killer = spawn_link(fun Recur() ->
Workers = ehttpc:workers(Pool),
MRefs = lists:map(fun({_, P}) -> monitor(process, P) end, Workers),
lists:foreach(fun({_, P}) -> exit(P, kill) end, Workers),
lists:foreach(
fun(MRef) ->
receive
{'DOWN', MRef, process, _, _} ->
ok
end
end,
MRefs
),
timer:sleep(1),
Recur()
end),
%% Give it some time to reach maximum restart intensity
timer:sleep(100),
?assertEqual({error, {processes_down, [worker_sup]}}, ehttpc:check_pool_integrity(Pool)),
unlink(Killer),
exit(Killer, kill),
ok = ehttpc_sup:stop_pool(Pool),
?assertEqual({error, not_found}, ehttpc:check_pool_integrity(Pool)),
ok.

%% Previously, we had a fixed restart intensity for the worker supervisor, meaning that if
%% a large pool dies once, it brings down the supervisor. This checks that we have an
%% intensity proportional to the pool size, so the whole pool may restart at once without
%% bringing the supervisor down.
big_pool_dies_and_recovers_test() ->
Pool = atom_to_binary(?FUNCTION_NAME),
Opts = [
{host, "google.com"},
{port, "80"},
{enable_pipelining, 1},
%% N.B.: big pool
{pool_size, 50},
{pool_type, random},
{connect_timeout, 5000},
{prioritise_latest, true}
],
application:ensure_all_started(ehttpc),
{ok, _} = ehttpc_sup:start_pool(Pool, Opts),
%% Kill all workers at once.
Workers = ehttpc:workers(Pool),
lists:foreach(fun({_Id, Pid}) -> exit(Pid, kill) end, Workers),
timer:sleep(100),
?assertEqual(ok, ehttpc:check_pool_integrity(Pool)),
ok = ehttpc_sup:stop_pool(Pool),
ok.