From 0fbc18deef59a14357e3cbd5f357bc29ee1c9651 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Fri, 12 Sep 2025 16:23:34 -0300 Subject: [PATCH 1/3] feat: add function to check supervision tree integrity See also https://github.com/emqx/ecpool/pull/59 --- src/ehttpc.erl | 6 +++++ src/ehttpc_sup.erl | 49 ++++++++++++++++++++++++++++++++++++++- test/ehttpc_sup_tests.erl | 44 +++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/src/ehttpc.erl b/src/ehttpc.erl index 42024d9..bc2b110 100644 --- a/src/ehttpc.erl +++ b/src/ehttpc.erl @@ -32,6 +32,7 @@ request_async/5, workers/1, health_check/2, + check_pool_integrity/1, name/1 ]). @@ -235,6 +236,11 @@ mk_async_request(delete = Method, Req, ExpireAt, RC) when ?IS_HEADERS_REQ(Req) - workers(Pool) -> gproc_pool:active_workers(name(Pool)). +-spec check_pool_integrity(pool_name()) -> + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. +check_pool_integrity(Pool) -> + ehttpc_sup:check_pool_integrity(Pool). + name(Pool) -> {?MODULE, Pool}. %%-------------------------------------------------------------------- diff --git a/src/ehttpc_sup.erl b/src/ehttpc_sup.erl index 126927c..d766950 100644 --- a/src/ehttpc_sup.erl +++ b/src/ehttpc_sup.erl @@ -23,7 +23,8 @@ %% API -export([ start_pool/2, - stop_pool/1 + stop_pool/1, + check_pool_integrity/1 ]). %% Supervisor callbacks @@ -34,6 +35,16 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +-spec check_pool_integrity(ehttpc:pool_name()) -> + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. +check_pool_integrity(Pool) -> + case get_child(child_id(Pool)) of + {ok, SupPid} -> + do_check_pool_integrity_root(SupPid); + {error, Reason} -> + {error, Reason} + end. + %%-------------------------------------------------------------------- %% Start/Stop a pool %%-------------------------------------------------------------------- @@ -72,3 +83,39 @@ pool_spec(Pool, Opts) -> }. child_id(Pool) -> {ehttpc_pool_sup, Pool}. + +%%-------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------- + +get_child(Id) -> + Res = [Child || {Id0, Child, supervisor, _} <- supervisor:which_children(?MODULE), Id == Id0], + case Res of + [] -> + {error, not_found}; + [undefined] -> + {error, dead}; + [restarting] -> + {error, restarting}; + [Pid] when is_pid(Pid) -> + {ok, Pid} + end. + +do_check_pool_integrity_root(SupPid) -> + try supervisor:which_children(SupPid) of + Children -> + %% We ignore `restarting` here because those processes are still being + %% managed. + DeadChildren = [Id || {Id, undefined, _, _} <- Children], + %% Currently, at root, we only have one supervisor: `ehttpc_worker_sup`, and + %% it does not contain other supervisors under it, so no need to dig deeper. + case DeadChildren of + [_ | _] -> + {error, {processes_down, DeadChildren}}; + [] -> + ok + end + catch + exit:{noproc, _} -> + {error, {processes_down, [root]}} + end. diff --git a/test/ehttpc_sup_tests.erl b/test/ehttpc_sup_tests.erl index ccefec7..f4cc1a6 100644 --- a/test/ehttpc_sup_tests.erl +++ b/test/ehttpc_sup_tests.erl @@ -17,6 +17,7 @@ -module(ehttpc_sup_tests). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). shutdown_test_() -> [{timeout, 10, fun t_shutdown/0}]. @@ -59,3 +60,46 @@ wait_for_down(Pids) -> after 10_000 -> error(timeout) end. + +%% Smoke tests for `ecpool:check_pool_integrity`, which should report an error when worker +%% supervisor is down. +check_pool_integrity_test() -> + Pool = atom_to_binary(?MODULE), + Opts = [ + {host, "google.com"}, + {port, "80"}, + {enable_pipelining, 1}, + {pool_size, 15}, + {pool_type, random}, + {connect_timeout, 5000}, + {prioritise_latest, true} + ], + application:ensure_all_started(ehttpc), + {ok, _} = ehttpc_sup:start_pool(Pool, Opts), + ?assertEqual(ok, ehttpc:check_pool_integrity(Pool)), + ok = ehttpc_sup:stop_pool(Pool), + {ok, _} = ehttpc_sup:start_pool(Pool, Opts), + Killer = spawn_link(fun Recur() -> + Workers = ehttpc:workers(Pool), + MRefs = lists:map(fun({_, P}) -> monitor(process, P) end, Workers), + lists:foreach(fun({_, P}) -> exit(P, kill) end, Workers), + lists:foreach( + fun(MRef) -> + receive + {'DOWN', MRef, process, _, _} -> + ok + end + end, + MRefs + ), + timer:sleep(1), + Recur() + end), + %% Give it some time to reach maximum restart intensity + timer:sleep(100), + ?assertEqual({error, {processes_down, [worker_sup]}}, ehttpc:check_pool_integrity(Pool)), + unlink(Killer), + exit(Killer, kill), + ok = ehttpc_sup:stop_pool(Pool), + ?assertEqual({error, not_found}, ehttpc:check_pool_integrity(Pool)), + ok. From 9f27bde9ba6b661279054edd85e253de9bb1d5c8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Fri, 12 Sep 2025 16:26:48 -0300 Subject: [PATCH 2/3] fix: make max restart intensity proportional to pool size See also https://github.com/emqx/ecpool/pull/59 See also https://emqx.atlassian.net/browse/EMQX-14705 Previously, we had a fixed restart intensity for the worker supervisor, meaning that if a large pool dies once, it brings down the supervisor. Here, we have an intensity proportional to the pool size, so the whole pool may restart at once without bringing the supervisor down. --- src/ehttpc_worker_sup.erl | 11 +++++++++-- test/ehttpc_sup_tests.erl | 26 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/ehttpc_worker_sup.erl b/src/ehttpc_worker_sup.erl index 1674831..9646bae 100644 --- a/src/ehttpc_worker_sup.erl +++ b/src/ehttpc_worker_sup.erl @@ -26,6 +26,13 @@ start_link(Pool, Opts) -> supervisor:start_link(?MODULE, [Pool, Opts]). init([Pool, Opts]) -> + PoolSize = pool_size(Opts), + SupFlags = #{ + strategy => one_for_one, + %% Allow whole pool dying simultaneously at least once. + intensity => 10 + PoolSize, + period => 60 + }, WorkerSpec = fun(Id) -> #{ id => {Pool, Id}, @@ -36,8 +43,8 @@ init([Pool, Opts]) -> modules => [ehttpc] } end, - Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))], - {ok, {{one_for_one, 10, 60}, Workers}}. + Workers = [WorkerSpec(I) || I <- lists:seq(1, PoolSize)], + {ok, {SupFlags, Workers}}. pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), diff --git a/test/ehttpc_sup_tests.erl b/test/ehttpc_sup_tests.erl index f4cc1a6..94c64fb 100644 --- a/test/ehttpc_sup_tests.erl +++ b/test/ehttpc_sup_tests.erl @@ -103,3 +103,29 @@ check_pool_integrity_test() -> ok = ehttpc_sup:stop_pool(Pool), ?assertEqual({error, not_found}, ehttpc:check_pool_integrity(Pool)), ok. + +%% Previously, we had a fixed restart intensity for the worker supervisor, meaning that if +%% a large pool dies once, it brings down the supervisor. This checks that we have an +%% intensity proportional to the pool size, so the whole pool may restart at once without +%% bringing the supervisor down. +big_pool_dies_and_recovers_test() -> + Pool = atom_to_binary(?FUNCTION_NAME), + Opts = [ + {host, "google.com"}, + {port, "80"}, + {enable_pipelining, 1}, + %% N.B.: big pool + {pool_size, 50}, + {pool_type, random}, + {connect_timeout, 5000}, + {prioritise_latest, true} + ], + application:ensure_all_started(ehttpc), + {ok, _} = ehttpc_sup:start_pool(Pool, Opts), + %% Kill all workers at once. + Workers = ehttpc:workers(Pool), + lists:foreach(fun({_Id, Pid}) -> exit(Pid, kill) end, Workers), + timer:sleep(100), + ?assertEqual(ok, ehttpc:check_pool_integrity(Pool)), + ok = ehttpc_sup:stop_pool(Pool), + ok. From 18c41ad54ab0fcf36eae2024a9ee62c41efbda6b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Fri, 12 Sep 2025 16:34:44 -0300 Subject: [PATCH 3/3] chore: add changelog, bump app vsn --- changelog.md | 10 ++++++++++ src/ehttpc.app.src | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index f5d7d3c..74f9ed5 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # ehttpc changes +## 0.7.3 + +- Previously, we had a fixed restart intensity for the worker supervisor, meaning that if a + large pool dies once, it brings down the supervisor. Here, we have an intensity + proportional to the pool size, so the whole pool may restart at once without bringing the + supervisor down. +- Added an `ehttpc:check_pool_integrity/1` function to check whether the supervision tree + is whole (i.e., no children of the root supervisor are permanently dead and won't be + restarted). + ## 0.7.2 - Fix rebar.config. diff --git a/src/ehttpc.app.src b/src/ehttpc.app.src index 73d9706..b7e5237 100644 --- a/src/ehttpc.app.src +++ b/src/ehttpc.app.src @@ -1,6 +1,6 @@ {application, ehttpc, [ {description, "HTTP Client for Erlang/OTP"}, - {vsn, "0.7.1"}, + {vsn, "0.7.3"}, {registered, []}, {applications, [ kernel,