diff --git a/changelog.md b/changelog.md index f5d7d3c..74f9ed5 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # ehttpc changes +## 0.7.3 + +- Previously, we had a fixed restart intensity for the worker supervisor, meaning that if a + large pool dies once, it brings down the supervisor. Here, we have an intensity + proportional to the pool size, so the whole pool may restart at once without bringing the + supervisor down. +- Added an `ehttpc:check_pool_integrity/1` function to check whether the supervision tree + is whole (i.e., no children of the root supervisor are permanently dead and won't be + restarted). + ## 0.7.2 - Fix rebar.config. diff --git a/src/ehttpc.app.src b/src/ehttpc.app.src index 73d9706..b7e5237 100644 --- a/src/ehttpc.app.src +++ b/src/ehttpc.app.src @@ -1,6 +1,6 @@ {application, ehttpc, [ {description, "HTTP Client for Erlang/OTP"}, - {vsn, "0.7.1"}, + {vsn, "0.7.3"}, {registered, []}, {applications, [ kernel, diff --git a/src/ehttpc.erl b/src/ehttpc.erl index 42024d9..bc2b110 100644 --- a/src/ehttpc.erl +++ b/src/ehttpc.erl @@ -32,6 +32,7 @@ request_async/5, workers/1, health_check/2, + check_pool_integrity/1, name/1 ]). @@ -235,6 +236,11 @@ mk_async_request(delete = Method, Req, ExpireAt, RC) when ?IS_HEADERS_REQ(Req) - workers(Pool) -> gproc_pool:active_workers(name(Pool)). +-spec check_pool_integrity(pool_name()) -> + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. +check_pool_integrity(Pool) -> + ehttpc_sup:check_pool_integrity(Pool). + name(Pool) -> {?MODULE, Pool}. %%-------------------------------------------------------------------- diff --git a/src/ehttpc_sup.erl b/src/ehttpc_sup.erl index 126927c..d766950 100644 --- a/src/ehttpc_sup.erl +++ b/src/ehttpc_sup.erl @@ -23,7 +23,8 @@ %% API -export([ start_pool/2, - stop_pool/1 + stop_pool/1, + check_pool_integrity/1 ]). %% Supervisor callbacks @@ -34,6 +35,16 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +-spec check_pool_integrity(ehttpc:pool_name()) -> + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. +check_pool_integrity(Pool) -> + case get_child(child_id(Pool)) of + {ok, SupPid} -> + do_check_pool_integrity_root(SupPid); + {error, Reason} -> + {error, Reason} + end. + %%-------------------------------------------------------------------- %% Start/Stop a pool %%-------------------------------------------------------------------- @@ -72,3 +83,39 @@ pool_spec(Pool, Opts) -> }. child_id(Pool) -> {ehttpc_pool_sup, Pool}. + +%%-------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------- + +get_child(Id) -> + Res = [Child || {Id0, Child, supervisor, _} <- supervisor:which_children(?MODULE), Id == Id0], + case Res of + [] -> + {error, not_found}; + [undefined] -> + {error, dead}; + [restarting] -> + {error, restarting}; + [Pid] when is_pid(Pid) -> + {ok, Pid} + end. + +do_check_pool_integrity_root(SupPid) -> + try supervisor:which_children(SupPid) of + Children -> + %% We ignore `restarting` here because those processes are still being + %% managed. + DeadChildren = [Id || {Id, undefined, _, _} <- Children], + %% Currently, at root, we only have one supervisor: `ehttpc_worker_sup`, and + %% it does not contain other supervisors under it, so no need to dig deeper. + case DeadChildren of + [_ | _] -> + {error, {processes_down, DeadChildren}}; + [] -> + ok + end + catch + exit:{noproc, _} -> + {error, {processes_down, [root]}} + end. diff --git a/src/ehttpc_worker_sup.erl b/src/ehttpc_worker_sup.erl index 1674831..9646bae 100644 --- a/src/ehttpc_worker_sup.erl +++ b/src/ehttpc_worker_sup.erl @@ -26,6 +26,13 @@ start_link(Pool, Opts) -> supervisor:start_link(?MODULE, [Pool, Opts]). init([Pool, Opts]) -> + PoolSize = pool_size(Opts), + SupFlags = #{ + strategy => one_for_one, + %% Allow whole pool dying simultaneously at least once. + intensity => 10 + PoolSize, + period => 60 + }, WorkerSpec = fun(Id) -> #{ id => {Pool, Id}, @@ -36,8 +43,8 @@ init([Pool, Opts]) -> modules => [ehttpc] } end, - Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))], - {ok, {{one_for_one, 10, 60}, Workers}}. + Workers = [WorkerSpec(I) || I <- lists:seq(1, PoolSize)], + {ok, {SupFlags, Workers}}. pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), diff --git a/test/ehttpc_sup_tests.erl b/test/ehttpc_sup_tests.erl index ccefec7..94c64fb 100644 --- a/test/ehttpc_sup_tests.erl +++ b/test/ehttpc_sup_tests.erl @@ -17,6 +17,7 @@ -module(ehttpc_sup_tests). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). shutdown_test_() -> [{timeout, 10, fun t_shutdown/0}]. @@ -59,3 +60,72 @@ wait_for_down(Pids) -> after 10_000 -> error(timeout) end. + +%% Smoke tests for `ecpool:check_pool_integrity`, which should report an error when worker +%% supervisor is down. +check_pool_integrity_test() -> + Pool = atom_to_binary(?MODULE), + Opts = [ + {host, "google.com"}, + {port, "80"}, + {enable_pipelining, 1}, + {pool_size, 15}, + {pool_type, random}, + {connect_timeout, 5000}, + {prioritise_latest, true} + ], + application:ensure_all_started(ehttpc), + {ok, _} = ehttpc_sup:start_pool(Pool, Opts), + ?assertEqual(ok, ehttpc:check_pool_integrity(Pool)), + ok = ehttpc_sup:stop_pool(Pool), + {ok, _} = ehttpc_sup:start_pool(Pool, Opts), + Killer = spawn_link(fun Recur() -> + Workers = ehttpc:workers(Pool), + MRefs = lists:map(fun({_, P}) -> monitor(process, P) end, Workers), + lists:foreach(fun({_, P}) -> exit(P, kill) end, Workers), + lists:foreach( + fun(MRef) -> + receive + {'DOWN', MRef, process, _, _} -> + ok + end + end, + MRefs + ), + timer:sleep(1), + Recur() + end), + %% Give it some time to reach maximum restart intensity + timer:sleep(100), + ?assertEqual({error, {processes_down, [worker_sup]}}, ehttpc:check_pool_integrity(Pool)), + unlink(Killer), + exit(Killer, kill), + ok = ehttpc_sup:stop_pool(Pool), + ?assertEqual({error, not_found}, ehttpc:check_pool_integrity(Pool)), + ok. + +%% Previously, we had a fixed restart intensity for the worker supervisor, meaning that if +%% a large pool dies once, it brings down the supervisor. This checks that we have an +%% intensity proportional to the pool size, so the whole pool may restart at once without +%% bringing the supervisor down. +big_pool_dies_and_recovers_test() -> + Pool = atom_to_binary(?FUNCTION_NAME), + Opts = [ + {host, "google.com"}, + {port, "80"}, + {enable_pipelining, 1}, + %% N.B.: big pool + {pool_size, 50}, + {pool_type, random}, + {connect_timeout, 5000}, + {prioritise_latest, true} + ], + application:ensure_all_started(ehttpc), + {ok, _} = ehttpc_sup:start_pool(Pool, Opts), + %% Kill all workers at once. + Workers = ehttpc:workers(Pool), + lists:foreach(fun({_Id, Pid}) -> exit(Pid, kill) end, Workers), + timer:sleep(100), + ?assertEqual(ok, ehttpc:check_pool_integrity(Pool)), + ok = ehttpc_sup:stop_pool(Pool), + ok.