From 1be02a1e06fbc47085064be2b7fdce2c4280f7e4 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 8 Nov 2024 17:27:51 +0800 Subject: [PATCH 01/11] feat: make sure relup from 0.5.5 works --- src/ecpool.appup.src | 16 +++++++++++++++- src/ecpool_monitor.erl | 40 ++++++++++++++++++++++++++++------------ src/ecpool_worker.erl | 11 +++++++++++ 3 files changed, 54 insertions(+), 13 deletions(-) diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index c871f73d..b48fc9aa 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -1,9 +1,15 @@ %% -*-: erlang -*- -{"0.5.10", +{"0.5.11", [ + {"0.5.10", [ + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []} + ]}, {"0.5.9", [ + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, + %% e4.4.26 uses ecpool 0.5.5, while e4.4.27 uses 0.5.11 {<<"0\\.5\\.[3-8]">>, [ %% NOTE: MUST start ecpool_monitor before any load_module instructions {add_module, ecpool_monitor}, @@ -12,6 +18,7 @@ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[0-2]">>, [ @@ -21,6 +28,7 @@ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, @@ -29,6 +37,7 @@ {apply, {ecpool_monitor, ensure_monitor_started, []}}, {apply, {ecpool_monitor, update_clients_global, []}}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, @@ -36,7 +45,12 @@ ]} ], [ + {"0.5.10", [ + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []} + ]}, {"0.5.9", [ + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[3-8]">>, [ diff --git a/src/ecpool_monitor.erl b/src/ecpool_monitor.erl index 0fbf36af..ab07106c 100644 --- a/src/ecpool_monitor.erl +++ b/src/ecpool_monitor.erl @@ -26,7 +26,9 @@ ]). -export([ update_clients_global/0 + , upgrade_clients_state/0 , reg_worker/0 + , reg_worker/1 , get_all_global_clients/0 , put_client_global/1 , put_client_global/2 @@ -72,14 +74,12 @@ monitor_spec() -> }. update_clients_global() -> - lists:foreach(fun({PoolName, _}) -> - lists:foreach(fun({_, WrokerPid}) -> - case ecpool_worker:client(WrokerPid) of - {ok, Client} -> put_client_global(WrokerPid, Client); - _ -> ok - end - end, ecpool:workers(PoolName)) - end, ecpool_sup:pools()). + with_all_workers(fun put_client_global_and_start_monitor/2). + +upgrade_clients_state() -> + with_all_workers(fun(_, WorkerPid) -> + ecpool_worker:upgrade_state(WorkerPid) + end). get_all_global_clients() -> ets:tab2list(?DISCOVERY_TAB). @@ -87,8 +87,8 @@ get_all_global_clients() -> put_client_global(Client) -> put_client_global(self(), Client). -put_client_global(WrokerPid, Client) -> - ets:insert(?DISCOVERY_TAB, {WrokerPid, Client}), +put_client_global(WorkerPid, Client) -> + ets:insert(?DISCOVERY_TAB, {WorkerPid, Client}), ok. get_client_global(WorkerPid) -> @@ -102,7 +102,10 @@ get_client_global(WorkerPid) -> end. reg_worker() -> - gen_server:call(?MODULE, {reg_worker, self()}, infinity). + reg_worker(self()). + +reg_worker(WorkerPid) -> + gen_server:call(?MODULE, {reg_worker, WorkerPid}, infinity). %%============================================================================== @@ -143,4 +146,17 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== - +with_all_workers(Fun) -> + lists:foreach(fun({PoolName, _}) -> + lists:foreach(fun({_, WorkerPid}) -> + Fun(PoolName, WorkerPid) + end, ecpool:workers(PoolName)) + end, ecpool_sup:pools()). + +put_client_global_and_start_monitor(_, WorkerPid) -> + case ecpool_worker:client(WorkerPid) of + {ok, Client} -> + ok = reg_worker(WorkerPid), + put_client_global(WorkerPid, Client); + _ -> ok + end. diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index d05fbbee..b1dc1b09 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -20,6 +20,8 @@ -export([start_link/4]). +-export([upgrade_state/1]). + %% API Function Exports -export([ client/1 , exec/3 @@ -124,6 +126,9 @@ get_reconnect_callbacks(Pid) -> add_disconnect_callback(Pid, OnDisconnect) -> gen_server:cast(Pid, {add_disconn_callbk, OnDisconnect}). +upgrade_state(Pid) -> + gen_server:cast(Pid, upgrade_state). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -202,6 +207,12 @@ handle_cast({remove_reconn_callbk, OnReconnect}, State = #state{on_reconnect = O handle_cast({add_disconn_callbk, OnDisconnect}, State = #state{on_disconnect = OldOnDisconnect}) -> {noreply, State#state{on_disconnect = add_conn_callback(OnDisconnect, OldOnDisconnect)}}; +handle_cast(upgrade_state, #state{on_reconnect = OnReconnect, on_disconnect = OnDisconnect} = State) -> + {noreply, State#state{ + on_reconnect = ensure_callback(OnReconnect), + on_disconnect = ensure_callback(OnDisconnect) + }}; + handle_cast(_Msg, State) -> {noreply, State}. From d16e504830c1b9e4b65dcd8e6d9ce7da3b962cb8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 12 Nov 2024 17:48:11 +0800 Subject: [PATCH 02/11] fix: update appup --- src/ecpool.appup.src | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index b48fc9aa..c993d477 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -58,7 +58,8 @@ {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, - {apply, {ecpool_monitor, ensure_monitor_stopped, []}} + {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {delete_module,ecpool_monitor} ]}, {<<"0\\.5\\.[0-2]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, @@ -66,7 +67,8 @@ {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, - {apply, {ecpool_monitor, ensure_monitor_stopped, []}} + {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {delete_module,ecpool_monitor} ]}, {"0.4.2", [ {load_module, ecpool_worker, brutal_purge, soft_purge, []}, @@ -74,7 +76,8 @@ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, - {apply, {ecpool_monitor, ensure_monitor_stopped, []}} + {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {delete_module,ecpool_monitor} ]} ] }. From 1e52b054798210a22bc95b8f8c48cefdba4bcc24 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Nov 2024 19:13:15 +0800 Subject: [PATCH 03/11] fix: avoid slow connections block the ecpool_sup --- src/ecpool.erl | 45 +++++++++++++++++++++++++++-- src/ecpool_worker.erl | 56 +++++++++++++++++++++++++++--------- src/ecpool_worker_sup.erl | 2 +- test/ecpool_SUITE.erl | 39 +++++++++++++++++++++++-- test/test_timeout_client.erl | 26 +++++++++++++++++ 5 files changed, 148 insertions(+), 20 deletions(-) create mode 100644 test/test_timeout_client.erl diff --git a/src/ecpool.erl b/src/ecpool.erl index fb0409a6..67e345a0 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -81,11 +81,25 @@ pool_spec(ChildId, Pool, Mod, Opts) -> %% @doc Start the pool sup. -spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, term()}). start_pool(Pool, Mod, Opts) -> - ecpool_pool_sup:start_link(Pool, Mod, Opts). + case ecpool_pool_sup:start_link(Pool, Mod, Opts) of + {ok, Pid} -> + wait_for_workers_started(Pid, fun() -> + gen_server:stop(Pid) + end); + {error, Reason} -> + {error, Reason} + end. %% @doc Start the pool supervised by ecpool_sup start_sup_pool(Pool, Mod, Opts) -> - ecpool_sup:start_pool(Pool, Mod, Opts). + case ecpool_sup:start_pool(Pool, Mod, Opts) of + {ok, Pid} -> + wait_for_workers_started(Pid, fun() -> + stop_sup_pool(Pool) + end); + {error, Reason} -> + {error, Reason} + end. %% @doc Start the pool supervised by ecpool_sup stop_sup_pool(Pool) -> @@ -183,3 +197,30 @@ exec({M, F, A}, Client) -> erlang:apply(M, F, [Client]++A); exec(Action, Client) when is_function(Action) -> Action(Client). + +wait_for_workers_started(Pid, Clearer) -> + case lists:keyfind(worker_sup, 1, supervisor:which_children(Pid)) of + {worker_sup, WorkerSupPid, supervisor, _} -> + case check_worker_start_results(supervisor:which_children(WorkerSupPid)) of + ok -> {ok, Pid}; + Error -> + _ = Clearer(), + Error + end; + false -> + _ = Clearer(), + {error, no_worker_sup} + end. + +check_worker_start_results([]) -> + ok; +check_worker_start_results([{_, WorkerPid, worker, _} | Workers]) -> + %% NOTE: `ecpool_worker:take_start_result/1` is an infinity call which will block the caller + %% if the worker is busy on starting. + case ecpool_worker:take_start_result(WorkerPid) of + not_started -> {error, worker_not_started}; + {start_failed, Error} -> {error, {worker_start_failed, Error}}; + {exit, Reason} -> {error, {worker_exit, Reason}}; + started -> check_worker_start_results(Workers); + undefined -> check_worker_start_results(Workers) + end. diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index b1dc1b09..21e235a0 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -28,6 +28,7 @@ , exec_async/2 , exec_async/3 , is_connected/1 + , take_start_result/1 , set_reconnect_callback/2 , set_disconnect_callback/2 , add_reconnect_callback/2 @@ -39,6 +40,7 @@ %% gen_server Function Exports -export([ init/1 + , handle_continue/2 , handle_call/3 , handle_cast/2 , handle_info/2 @@ -66,6 +68,11 @@ -callback(connect(ConnOpts :: list()) -> {ok, pid()} | {ok, {pid(), pid()}, map()} | {error, Reason :: term()}). +-type start_result() :: not_started | started | {start_failed, term()} | {exit, term()} | undefined. + +-define(set_start_result(RESULT), erlang:put(start_result, RESULT)). +-define(take_start_result(), erlang:erase(start_result)). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -98,6 +105,14 @@ exec_async(Pid, Action, Callback) -> is_connected(Pid) -> gen_server:call(Pid, is_connected, infinity). +-spec take_start_result(pid()) -> start_result(). +take_start_result(Pid) -> + try gen_server:call(Pid, take_start_result, infinity) + catch + exit:{{shutdown, Reason}, _} -> {exit, Reason}; + exit:{Reason, _} -> {exit, Reason} + end. + -spec(set_reconnect_callback(pid(), ecpool:conn_callback()) -> ok). set_reconnect_callback(Pid, OnReconnect) -> gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). @@ -136,20 +151,28 @@ upgrade_state(Pid) -> init([Pool, Id, Mod, Opts]) -> ecpool_monitor:reg_worker(), process_flag(trap_exit, true), - State = #state{pool = Pool, - id = Id, - mod = Mod, - opts = Opts, - on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)), - on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts)) - }, + State = #state{ + pool = Pool, id = Id, mod = Mod, opts = Opts, + on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)), + on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts)) + }, + ?set_start_result(not_started), + {ok, State, {continue, start}}. + +handle_continue(start, #state{pool = Pool, id = Id} = State) -> case connect_internal(State) of {ok, NewState} -> gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), - {ok, NewState}; - {error, Error} -> {stop, Error} + ?set_start_result(started), + {noreply, NewState}; + {error, Error} -> + ?set_start_result({start_failed, Error}), + {noreply, State} end. +handle_call(take_start_result, _From, State) -> + {reply, ?take_start_result(), State}; + handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) -> IsAlive = Client =/= undefined andalso is_process_alive(Client), {reply, IsAlive, State}; @@ -164,7 +187,7 @@ handle_call(client, _From, State = #state{client = Client}) -> {reply, {ok, Client}, State}; handle_call({exec, Action}, _From, State = #state{client = Client}) -> - {reply, safe_exec(Action, Client), State}; + {reply, with_client(Action, Client), State}; handle_call(get_reconnect_callbacks, _From, #state{on_reconnect = OnReconnect} = State) -> {reply, OnReconnect, State}; @@ -174,11 +197,11 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({exec_async, Action}, State = #state{client = Client}) -> - _ = safe_exec(Action, Client), + _ = with_client(Action, Client), {noreply, State}; handle_cast({exec_async, Action, Callback}, State = #state{client = Client}) -> - _ = safe_exec(Callback, safe_exec(Action, Client)), + _ = safe_exec(Callback, with_client(Action, Client)), {noreply, State}; handle_cast({set_reconn_callbk, OnReconnect}, State) -> @@ -290,13 +313,13 @@ handle_reconnect(undefined, _) -> ok; handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) -> %% reverse apply the callbacks because the newer ones are cons-ed to the head of the list - lists:foreach(fun(OnReconnectCallback) -> safe_exec(OnReconnectCallback, Client) end, + lists:foreach(fun(OnReconnectCallback) -> with_client(OnReconnectCallback, Client) end, lists:reverse(OnReconnectList)). handle_disconnect(undefined, _) -> ok; handle_disconnect(Client, OnDisconnectList) -> - lists:foreach(fun(OnDisconnectCallback) -> safe_exec(OnDisconnectCallback, Client) end, + lists:foreach(fun(OnDisconnectCallback) -> with_client(OnDisconnectCallback, Client) end, OnDisconnectList). connect_internal(State) -> @@ -336,6 +359,11 @@ drop_reconnect_callbacks_by_signature(Callbacks, Signature) -> Pred = fun(CB) -> not is_reconnect_calback_signature_match(CB, Signature) end, lists:filter(Pred, Callbacks). +with_client(_, undefined) -> + {error, ecpool_disconnected}; +with_client(Action, Client) -> + safe_exec(Action, Client). + safe_exec({_M, _F, _A} = Action, MainArg) -> try exec(Action, MainArg) catch E:R:ST -> diff --git a/src/ecpool_worker_sup.erl b/src/ecpool_worker_sup.erl index 39cbaaf3..941ff085 100644 --- a/src/ecpool_worker_sup.erl +++ b/src/ecpool_worker_sup.erl @@ -30,7 +30,7 @@ init([Pool, Mod, Opts]) -> #{id => {worker, Id}, start => {ecpool_worker, start_link, [Pool, Id, Mod, Opts]}, restart => transient, - shutdown => 5000, + shutdown => 2000, type => worker, modules => [ecpool_worker, Mod]} end, diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 816d6b1b..4314aa6c 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -40,6 +40,23 @@ {encoding, utf8} ]). +-define(assertMatchOneOf(Guard1, Guard2, Expr), + begin + ((fun () -> + case (Expr) of + Guard1 -> ok; + Guard2 -> ok; + X__V -> erlang:error( + {assertMatch, + [{module, ?MODULE}, + {line, ?LINE}, + {expression, (??Expr)}, + {pattern, ([??Guard1, ??Guard2])}, + {value, X__V}]}) + end + end)()) + end). + all() -> [{group, all}]. @@ -48,6 +65,7 @@ groups() -> [t_start_pool, t_start_pool_any_name, t_start_sup_pool, + t_start_sup_pool_timeout, t_empty_pool, t_empty_hash_pool, t_restart_client, @@ -92,18 +110,19 @@ t_start_pool_any_name(_Config) -> end, lists:seq(1, 10)). t_empty_pool(_Config) -> - ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, random}]), + ?assertMatchOneOf({error, {worker_exit, normal}}, {error, {worker_exit, noproc}}, + ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, random}])), % NOTE: give some time to clients to exit ok = timer:sleep(100), ?assertEqual([], ecpool:workers(?POOL)), - ?assertEqual({error, ecpool_empty}, ecpool:with_client(?POOL, fun(_) -> ok end)). + ?assertEqual({error, no_such_pool}, ecpool:with_client(?POOL, fun(_) -> ok end)). t_empty_hash_pool(_Config) -> ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, hash}]), % NOTE: give some time to clients to exit ok = timer:sleep(100), ?assertEqual([], ecpool:workers(?POOL)), - ?assertEqual({error, ecpool_empty}, ecpool:with_client(?POOL, 42, fun(_) -> ok end)). + ?assertEqual({error, no_such_pool}, ecpool:with_client(?POOL, 42, fun(_) -> ok end)). t_start_sup_pool(_Config) -> {ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS), @@ -113,6 +132,20 @@ t_start_sup_pool(_Config) -> ecpool:stop_sup_pool(xpool), ?assertEqual([], ecpool_sup:pools()). +t_start_sup_pool_timeout(_Config) -> + ShutdownTimeout = 2000, %% the `shutdown` option in ecpool_worker_sup:init/1 + Size = 2, + Opts = [{pool_size, Size} | proplists:delete(pool_size, ?POOL_OPTS)], + spawn_link(fun() -> + ?assertMatch({error, {worker_exit, killed}}, + ecpool:start_sup_pool(timeout_pool, test_timeout_client, Opts)) + end), + timer:sleep(200), + {Time, Val} = timer:tc(ecpool, stop_sup_pool, [timeout_pool]), + ?assert(Time/1000 < (ShutdownTimeout * Size + 1000)), + ?assertMatchOneOf(ok, {error,not_found}, Val), + ?assertEqual([], ecpool_sup:pools()). + t_restart_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), Workers1 = ecpool:workers(?POOL), diff --git a/test/test_timeout_client.erl b/test/test_timeout_client.erl new file mode 100644 index 00000000..dccbbe8e --- /dev/null +++ b/test/test_timeout_client.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(test_timeout_client). + +-behaviour(ecpool_worker). + +-export([connect/1]). + +connect(Options) -> + Delay = proplists:get_value(delay, Options, 120000), + timer:sleep(Delay), + {ok, erlang:spawn_link(fun() -> ok end)}. From 9e8abecc469e5e958a590818da337e5103de760a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Nov 2024 19:15:29 +0800 Subject: [PATCH 04/11] chore: update appup --- src/ecpool.appup.src | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index c993d477..d3f962a5 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -1,13 +1,20 @@ %% -*-: erlang -*- {"0.5.11", [ + {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} + ]}, {"0.5.10", [ {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, - {load_module, ecpool_worker, brutal_purge, soft_purge, []} + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, - {load_module, ecpool_worker, brutal_purge, soft_purge, []} + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, %% e4.4.26 uses ecpool 0.5.5, while e4.4.27 uses 0.5.11 {<<"0\\.5\\.[3-8]">>, [ @@ -19,6 +26,7 @@ {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {apply, {ecpool_monitor, upgrade_clients_state, []}}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[0-2]">>, [ @@ -47,11 +55,13 @@ [ {"0.5.10", [ {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, - {load_module, ecpool_worker, brutal_purge, soft_purge, []} + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, - {load_module, ecpool_worker, brutal_purge, soft_purge, []} + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[3-8]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, @@ -59,6 +69,7 @@ {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, {delete_module,ecpool_monitor} ]}, {<<"0\\.5\\.[0-2]">>, [ From 377b161dec05c5e61e6c4a3d924674fa49a38d72 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 28 Nov 2024 11:30:28 +0800 Subject: [PATCH 05/11] ci: add testcase for dup pool starting --- test/ecpool_SUITE.erl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 4314aa6c..f57260dd 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -66,6 +66,7 @@ groups() -> t_start_pool_any_name, t_start_sup_pool, t_start_sup_pool_timeout, + t_start_sup_duplicated, t_empty_pool, t_empty_hash_pool, t_restart_client, @@ -146,6 +147,20 @@ t_start_sup_pool_timeout(_Config) -> ?assertMatchOneOf(ok, {error,not_found}, Val), ?assertEqual([], ecpool_sup:pools()). +t_start_sup_duplicated(_Config) -> + Opts = [{pool_size, 1} | proplists:delete(pool_size, ?POOL_OPTS)], + spawn_link(fun() -> + ?assertMatch({error, {worker_exit, killed}}, + ecpool:start_sup_pool(dup_pool, test_timeout_client, Opts)) + end), + timer:sleep(200), + ?assertMatch({error, {already_started, _}}, + ecpool:start_sup_pool(dup_pool, test_timeout_client, Opts)), + ?assertMatch({ok, _}, + ecpool:start_sup_pool(another_pool, test_client, Opts)), + ecpool:stop_sup_pool(dup_pool), + ecpool:stop_sup_pool(another_pool). + t_restart_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), Workers1 = ecpool:workers(?POOL), From a836775941786a5d2d30748f880d48bc33e4b640 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 28 Nov 2024 16:12:55 +0800 Subject: [PATCH 06/11] chore: add function specs for ecpool:start_sup_pool/3 --- src/ecpool.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/ecpool.erl b/src/ecpool.erl index 67e345a0..2fbd4191 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -67,6 +67,11 @@ | {on_disconnect, conn_callback()} | tuple(). -type get_client_ret() :: pid() | false | no_such_pool. +-type start_error() :: no_worker_sup + | worker_not_started + | {worker_start_failed, term()} + | {worker_exit, term()} + | supervisor:startlink_err(). -define(IS_ACTION(ACTION), ((is_tuple(ACTION) andalso tuple_size(ACTION) == 3) orelse is_function(ACTION, 1))). @@ -79,7 +84,7 @@ pool_spec(ChildId, Pool, Mod, Opts) -> modules => [ecpool_pool_sup]}. %% @doc Start the pool sup. --spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, term()}). +-spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}). start_pool(Pool, Mod, Opts) -> case ecpool_pool_sup:start_link(Pool, Mod, Opts) of {ok, Pid} -> @@ -91,6 +96,7 @@ start_pool(Pool, Mod, Opts) -> end. %% @doc Start the pool supervised by ecpool_sup +-spec(start_sup_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}). start_sup_pool(Pool, Mod, Opts) -> case ecpool_sup:start_pool(Pool, Mod, Opts) of {ok, Pid} -> From 766871f84400ed8dede26d41caf8aaac1abcbb62 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 28 Nov 2024 19:01:10 +0800 Subject: [PATCH 07/11] fix: dont crash if pool worker already disconnected --- src/ecpool_worker.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 21e235a0..84203077 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -278,7 +278,14 @@ terminate(_Reason, #state{pool = Pool, id = Id, %% workers to be killed, the total time spend by the ecpool_worker_sup will be %% (0.3 * NumOfSupervisees * NumOfWorkers) seconds. stop_supervisees(SupPids, 300), - gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). + try + gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}) + catch + error:badarg -> + %% Ignore the `{badarg,[{gproc,unreg,[...]}]` error as it just means the pool worker + %% has not connected. + ok + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. From 07dd1ab13a7a6a3129c45c0d3bd51a2393b6c326 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 28 Nov 2024 19:08:45 +0800 Subject: [PATCH 08/11] fix: remove the support for OTP-22 --- .github/workflows/erlang.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index f83e4983..0f1ff128 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -15,7 +15,6 @@ jobs: strategy: matrix: include: - - otp_release: 22 - otp_release: 23 - otp_release: 24 - otp_release: 25 From 0492477720d49fbc419c6034b0008161e28a7f03 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 28 Nov 2024 19:18:21 +0800 Subject: [PATCH 09/11] fix: correct the appup --- src/ecpool.appup.src | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index d3f962a5..f3c6c290 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -1,17 +1,19 @@ %% -*-: erlang -*- -{"0.5.11", +{"0.5.12", [ - {"0.5.10", [ + {"0.5.11", [ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} @@ -53,12 +55,19 @@ ]} ], [ + {"0.5.11", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} + ]}, {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} From 0bbc92c25e9fffa2ed5d91098fd60dbda2743044 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Fri, 6 Dec 2024 11:22:37 -0300 Subject: [PATCH 10/11] fix: restart workers that fail to connect during init This ports several fixes that were somehow missed from 0.5.12 https://github.com/emqx/ecpool/compare/0.5.10...0.5.12 https://github.com/emqx/ecpool/compare/0.5.12...0.6.0 --- src/ecpool.erl | 109 ++++++++++++++--------------------- src/ecpool_pool_sup.erl | 11 ++-- src/ecpool_sup.erl | 14 ++--- src/ecpool_worker.erl | 86 +++++++++++++++------------ src/ecpool_worker_sup.erl | 13 ++--- test/ecpool_SUITE.erl | 57 ++++++++++++++++-- test/test_timeout_client.erl | 26 +++++++++ 7 files changed, 188 insertions(+), 128 deletions(-) create mode 100644 test/test_timeout_client.erl diff --git a/src/ecpool.erl b/src/ecpool.erl index 8abad079..a0298b79 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -67,6 +67,11 @@ | {on_disconnect, conn_callback()} | tuple(). -type get_client_ret() :: pid() | false | no_such_pool. +-type start_error() :: no_worker_sup + | worker_not_started + | {worker_start_failed, term()} + | {worker_exit, term()} + | supervisor:startlink_err(). -define(IS_ACTION(ACTION), ((is_tuple(ACTION) andalso tuple_size(ACTION) == 3) orelse is_function(ACTION, 1))). @@ -79,51 +84,27 @@ pool_spec(ChildId, Pool, Mod, Opts) -> modules => [ecpool_pool_sup]}. %% @doc Start the pool sup. --spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, term()}). +-spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}). start_pool(Pool, Mod, Opts) -> - %% See start_sup_pool/3 for an explanation of StartProcessAliasID - StartProcessAliasID = erlang:alias(), - try - {ok, Pid} = OkResponse = ecpool_pool_sup:start_link(Pool, - Mod, - Opts, - StartProcessAliasID), - case aggregate_initial_connect_responses(StartProcessAliasID, Opts) of - ok -> - OkResponse; - Error -> - unlink(Pid), - exit(Pid, shutdown), - Error - end - after - erlang:unalias(StartProcessAliasID), - flush_initial_connect_response_messages(StartProcessAliasID) + case ecpool_pool_sup:start_link(Pool, Mod, Opts) of + {ok, Pid} -> + wait_for_workers_started(Pid, fun() -> + gen_server:stop(Pid) + end); + {error, Reason} -> + {error, Reason} end. %% @doc Start the pool supervised by ecpool_sup +-spec(start_sup_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}). start_sup_pool(Pool, Mod, Opts) -> - %% To avoid confusing OTP crash report log entries, supervisors and the - %% gen_server ecpool_worker should never return a non-ok response from the - %% init function. Instead, to handle errors from the initial connect - %% attempt, we pass our process alias ID to the ecpool_worker so the workers - %% can send back the errors. - StartProcessAliasID = erlang:alias(), - try - OkResponse = ecpool_sup:start_pool(Pool, - Mod, - Opts, - StartProcessAliasID), - case aggregate_initial_connect_responses(StartProcessAliasID, Opts) of - ok -> - OkResponse; - Error -> - _ = stop_sup_pool(Pool), - Error - end - after - erlang:unalias(StartProcessAliasID), - flush_initial_connect_response_messages(StartProcessAliasID) + case ecpool_sup:start_pool(Pool, Mod, Opts) of + {ok, Pid} -> + wait_for_workers_started(Pid, fun() -> + stop_sup_pool(Pool) + end); + {error, Reason} -> + {error, Reason} end. %% @doc Stop the pool supervised by ecpool_sup @@ -225,31 +206,29 @@ exec(Action, Client) when is_function(Action) -> %% Internal functions -aggregate_initial_connect_responses(StartProcessAliasID, - Opts) -> - PoolSize = ecpool_worker_sup:pool_size(Opts), - aggregate_initial_connect_responses_helper(PoolSize, - StartProcessAliasID). - -aggregate_initial_connect_responses_helper(0 = _RespLeft, _RespRef) -> - %% We got an ok response from all workers - ok; -aggregate_initial_connect_responses_helper(RespLeft, RespRef) -> - receive - {Ref, ok} when Ref =:= RespRef -> - aggregate_initial_connect_responses_helper(RespLeft - 1, RespRef); - {Ref, NonOkResp} when Ref =:= RespRef -> - NonOkResp +wait_for_workers_started(Pid, Clearer) -> + case lists:keyfind(worker_sup, 1, supervisor:which_children(Pid)) of + {worker_sup, WorkerSupPid, supervisor, _} -> + case check_worker_start_results(supervisor:which_children(WorkerSupPid)) of + ok -> {ok, Pid}; + Error -> + _ = Clearer(), + Error + end; + false -> + _ = Clearer(), + {error, no_worker_sup} end. -%% Called after the receiver process has been unaliased itself to make sure we -%% remove all messages sent to the alias from the process inbox -flush_initial_connect_response_messages(RespRef) -> - receive - {Ref, _Resp} when Ref =:= RespRef -> - flush_initial_connect_response_messages(RespRef) - after 0 -> - ok +check_worker_start_results([]) -> + ok; +check_worker_start_results([{_, WorkerPid, worker, _} | Workers]) -> + %% NOTE: `ecpool_worker:take_start_result/1` is an infinity call which will block the caller + %% if the worker is busy on starting. + case ecpool_worker:take_start_result(WorkerPid) of + not_started -> {error, worker_not_started}; + {start_failed, Error} -> {error, {worker_start_failed, Error}}; + {exit, Reason} -> {error, {worker_exit, Reason}}; + started -> check_worker_start_results(Workers); + undefined -> check_worker_start_results(Workers) end. - - diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl index 0314c96c..ab83c0c4 100644 --- a/src/ecpool_pool_sup.erl +++ b/src/ecpool_pool_sup.erl @@ -19,23 +19,22 @@ -behaviour(supervisor). %% API --export([start_link/4]). +-export([start_link/3]). %% Supervisor callbacks -export([init/1]). -start_link(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> - supervisor:start_link(?MODULE, [Pool, Mod, Opts, InitialConnectResultReceiverAlias]). +start_link(Pool, Mod, Opts) -> + supervisor:start_link(?MODULE, [Pool, Mod, Opts]). -init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) -> +init([Pool, Mod, Opts]) -> {ok, { {one_for_all, 10, 100}, [ {pool, {ecpool_pool, start_link, [Pool, Opts]}, transient, 16#ffff, worker, [ecpool_pool]}, {worker_sup, {ecpool_worker_sup,start_link, - [Pool, Mod, Opts, InitialConnectResultReceiverAlias]}, + [Pool, Mod, Opts]}, transient, infinity, supervisor, [ecpool_worker_sup]}] }}. - diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index b16f64ba..80bf71f9 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -21,7 +21,7 @@ -export([start_link/0]). %% API --export([ start_pool/4 +-export([ start_pool/3 , stop_pool/1 , get_pool/1 ]). @@ -43,10 +43,9 @@ start_link() -> %%-------------------------------------------------------------------- %% @doc Start a pool. --spec(start_pool(pool_name(), atom(), list(tuple()), reference()) -> {ok, pid()} | {error, term()}). -start_pool(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> - supervisor:start_child(?MODULE, - pool_spec(Pool, Mod, Opts, InitialConnectResultReceiverAlias)). +-spec(start_pool(pool_name(), atom(), [term()]) -> {ok, pid()} | {error, term()}). +start_pool(Pool, Mod, Opts) -> + supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)). %% @doc Stop a pool. -spec(stop_pool(Pool :: pool_name()) -> ok | {error, term()}). @@ -81,15 +80,14 @@ pools() -> init([]) -> {ok, {{one_for_one, 10, 100}, [ecpool_monitor:monitor_spec()]}}. -pool_spec(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> +pool_spec(Pool, Mod, Opts) -> #{id => child_id(Pool), start => {ecpool_pool_sup, start_link, - [Pool, Mod, Opts, InitialConnectResultReceiverAlias]}, + [Pool, Mod, Opts]}, restart => transient, shutdown => infinity, type => supervisor, modules => [ecpool_pool_sup]}. child_id(Pool) -> {pool_sup, Pool}. - diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index a507e10e..903e4084 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([start_link/5]). +-export([start_link/4]). %% API Function Exports -export([ client/1 @@ -26,6 +26,7 @@ , exec_async/2 , exec_async/3 , is_connected/1 + , take_start_result/1 , set_reconnect_callback/2 , set_disconnect_callback/2 , add_reconnect_callback/2 @@ -37,6 +38,7 @@ %% gen_server Function Exports -export([ init/1 + , handle_continue/2 , handle_call/3 , handle_cast/2 , handle_info/2 @@ -57,6 +59,11 @@ opts :: proplists:proplist() }). +-type start_result() :: not_started | started | {start_failed, term()} | {exit, term()} | undefined. + +-define(set_start_result(RESULT), erlang:put(start_result, RESULT)). +-define(take_start_result(), erlang:erase(start_result)). + %%-------------------------------------------------------------------- %% Callback %%-------------------------------------------------------------------- @@ -69,12 +76,11 @@ %%-------------------------------------------------------------------- %% @doc Start a pool worker. --spec(start_link(pool_name(), pos_integer(), module(), list(), {pid(), reference()}) -> - {ok, pid()} | ignore | {error, any()}). -start_link(Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias) -> - gen_server:start_link(?MODULE, - [Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias], - []). +-spec(start_link(pool_name(), pos_integer(), module(), list()) -> + {ok, pid()}). +start_link(Pool, Id, Mod, Opts) -> + gen_server:start_link(?MODULE, [Pool, Id, Mod, Opts], []). + %% @doc Get client/connection. -spec(client(pid()) -> {ok, Client :: pid()} | {ok, {Client :: pid(), pid()}} | {error, Reason :: term()}). @@ -98,6 +104,14 @@ exec_async(Pid, Action, Callback) -> is_connected(Pid) -> gen_server:call(Pid, is_connected, infinity). +-spec take_start_result(pid()) -> start_result(). +take_start_result(Pid) -> + try gen_server:call(Pid, take_start_result, infinity) + catch + exit:{{shutdown, Reason}, _} -> {exit, Reason}; + exit:{Reason, _} -> {exit, Reason} + end. + -spec(set_reconnect_callback(pid(), ecpool:conn_callback()) -> ok). set_reconnect_callback(Pid, OnReconnect) -> gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). @@ -130,7 +144,7 @@ add_disconnect_callback(Pid, OnDisconnect) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias]) -> +init([Pool, Id, Mod, Opts]) -> ecpool_monitor:reg_worker(), process_flag(trap_exit, true), State = #state{pool = Pool, @@ -140,53 +154,52 @@ init([Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias]) -> on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)), on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts)) }, + ?set_start_result(not_started), + {ok, State, {continue, connect}}. + +handle_continue(connect, State) -> + #state{pool = Pool, + id = Id + } = State, case connect_internal(State) of {ok, NewState} -> gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), - send_initial_connect_response(InitialConnectResultReceiverAlias, ok), - {ok, NewState}; - Error -> - send_initial_connect_response(InitialConnectResultReceiverAlias, Error), - ignore + ?set_start_result(started), + {noreply, NewState}; + Error -> + ?set_start_result({start_failed, Error}), + {noreply, State} end. +handle_call(take_start_result, _From, State) -> + {reply, ?take_start_result(), State}; handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) -> IsAlive = Client =/= undefined andalso is_process_alive(Client), {reply, IsAlive, State}; - handle_call(is_connected, _From, State = #state{client = Client}) -> {reply, Client =/= undefined, State}; - handle_call(client, _From, State = #state{client = undefined}) -> {reply, {error, disconnected}, State}; - handle_call(client, _From, State = #state{client = Client}) -> {reply, {ok, Client}, State}; - handle_call({exec, Action}, _From, State = #state{client = Client}) -> - {reply, safe_exec(Action, Client), State}; - + {reply, with_client(Action, Client), State}; handle_call(get_reconnect_callbacks, _From, #state{on_reconnect = OnReconnect} = State) -> {reply, OnReconnect, State}; - handle_call(Req, _From, State) -> logger:error("[PoolWorker] unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({exec_async, Action}, State = #state{client = Client}) -> - _ = safe_exec(Action, Client), + _ = with_client(Action, Client), {noreply, State}; - handle_cast({exec_async, Action, Callback}, State = #state{client = Client}) -> - _ = safe_exec(Callback, safe_exec(Action, Client)), + _ = with_client(Callback, with_client(Action, Client)), {noreply, State}; - handle_cast({set_reconn_callbk, OnReconnect}, State) -> {noreply, State#state{on_reconnect = ensure_callback(OnReconnect)}}; - handle_cast({set_disconn_callbk, OnDisconnect}, State) -> {noreply, State#state{on_disconnect = ensure_callback(OnDisconnect)}}; - handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect0}) -> OldOnReconnect = case reconnect_callback_signature(OnReconnect) of @@ -196,17 +209,13 @@ handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldO OldOnReconnect0 end, {noreply, State#state{on_reconnect = add_conn_callback(OnReconnect, OldOnReconnect)}}; - handle_cast({remove_reconnect_callback_by_signature, CallbackSignature}, State = #state{on_reconnect = OnReconnect0}) -> OldOnReconnect = drop_reconnect_callbacks_by_signature(OnReconnect0, CallbackSignature), {noreply, State#state{on_reconnect = OldOnReconnect}}; - handle_cast({remove_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) -> {noreply, State#state{on_reconnect = remove_conn_callback(OnReconnect, OldOnReconnect)}}; - handle_cast({add_disconn_callbk, OnDisconnect}, State = #state{on_disconnect = OldOnDisconnect}) -> {noreply, State#state{on_disconnect = add_conn_callback(OnDisconnect, OldOnDisconnect)}}; - handle_cast(_Msg, State) -> {noreply, State}. @@ -222,7 +231,6 @@ handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = Sup [?MODULE, Reason, Pid, SupPids]), {noreply, State} end; - handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) -> case connect_internal(State) of {ok, NewState = #state{client = Client}} -> @@ -231,7 +239,6 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) {Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' -> reconnect(proplists:get_value(auto_reconnect, Opts), State) end; - handle_info(Info, State) -> logger:error("[PoolWorker] unexpected info: ~p", [Info]), {noreply, State}. @@ -286,13 +293,13 @@ handle_reconnect(undefined, _) -> ok; handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) -> %% reverse apply the callbacks because the newer ones are cons-ed to the head of the list - lists:foreach(fun(OnReconnectCallback) -> safe_exec(OnReconnectCallback, Client) end, + lists:foreach(fun(OnReconnectCallback) -> with_client(OnReconnectCallback, Client) end, lists:reverse(OnReconnectList)). handle_disconnect(undefined, _) -> ok; handle_disconnect(Client, OnDisconnectList) -> - lists:foreach(fun(OnDisconnectCallback) -> safe_exec(OnDisconnectCallback, Client) end, + lists:foreach(fun(OnDisconnectCallback) -> with_client(OnDisconnectCallback, Client) end, OnDisconnectList). connect_internal(State) -> @@ -332,6 +339,13 @@ drop_reconnect_callbacks_by_signature(Callbacks, Signature) -> Pred = fun(CB) -> not is_reconnect_callback_signature_match(CB, Signature) end, lists:filter(Pred, Callbacks). +with_client(_, undefined) -> + {error, ecpool_disconnected}; +with_client(_, {error, _} = Error) -> + Error; +with_client(Action, Client) -> + safe_exec(Action, Client). + safe_exec({_M, _F, _A} = Action, MainArg) -> try exec(Action, MainArg) catch E:R:ST -> @@ -425,7 +439,3 @@ monitor_child(Pid) -> %% that will be handled in shutdown/2. ok end. - -send_initial_connect_response(InitialConnectResultReceiverAlias, Response) -> - InitialConnectResultReceiverAlias ! {InitialConnectResultReceiverAlias, Response}, - ok. diff --git a/src/ecpool_worker_sup.erl b/src/ecpool_worker_sup.erl index 6225523e..e8f6c618 100644 --- a/src/ecpool_worker_sup.erl +++ b/src/ecpool_worker_sup.erl @@ -18,23 +18,23 @@ -behaviour(supervisor). --export([start_link/4]). +-export([start_link/3]). -export([init/1]). -export([pool_size/1]). -start_link(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> - supervisor:start_link(?MODULE, [Pool, Mod, Opts, InitialConnectResultReceiverAlias]). +start_link(Pool, Mod, Opts) -> + supervisor:start_link(?MODULE, [Pool, Mod, Opts]). -init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) -> +init([Pool, Mod, Opts]) -> WorkerSpec = fun(Id) -> #{id => {worker, Id}, start => {ecpool_worker, start_link, - [Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias]}, + [Pool, Id, Mod, Opts]}, restart => transient, - shutdown => 5000, + shutdown => 2_000, type => worker, modules => [ecpool_worker, Mod]} end, @@ -44,4 +44,3 @@ init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) -> pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Opts, Schedulers). - diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 0727534b..bd310699 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -16,8 +16,8 @@ -module(ecpool_SUITE). --compile(export_all). -compile(nowarn_export_all). +-compile(export_all). -include_lib("eunit/include/eunit.hrl"). @@ -40,6 +40,23 @@ {encoding, utf8} ]). +-define(assertMatchOneOf(Guard1, Guard2, Expr), + begin + ((fun () -> + case (Expr) of + Guard1 -> ok; + Guard2 -> ok; + X__V -> erlang:error( + {assertMatch, + [{module, ?MODULE}, + {line, ?LINE}, + {expression, (??Expr)}, + {pattern, ([??Guard1, ??Guard2])}, + {value, X__V}]}) + end + end)()) + end). + all() -> [{group, all}]. @@ -52,6 +69,8 @@ groups() -> t_start_pool_one_initial_connect_fail, t_start_pool_any_name, t_start_sup_pool, + t_start_sup_pool_timeout, + t_start_sup_duplicated, t_empty_pool, t_empty_hash_pool, t_restart_client, @@ -131,18 +150,19 @@ t_start_pool_any_name(_Config) -> end, lists:seq(1, 10)). t_empty_pool(_Config) -> - ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, random}]), + ?assertMatchOneOf({error, {worker_exit, normal}}, {error, {worker_exit, noproc}}, + ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, random}])), % NOTE: give some time to clients to exit ok = timer:sleep(100), ?assertEqual([], ecpool:workers(?POOL)), - ?assertEqual({error, ecpool_empty}, ecpool:with_client(?POOL, fun(_) -> ok end)). + ?assertEqual({error, no_such_pool}, ecpool:with_client(?POOL, fun(_) -> ok end)). t_empty_hash_pool(_Config) -> ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, hash}]), % NOTE: give some time to clients to exit ok = timer:sleep(100), ?assertEqual([], ecpool:workers(?POOL)), - ?assertEqual({error, ecpool_empty}, ecpool:with_client(?POOL, 42, fun(_) -> ok end)). + ?assertEqual({error, no_such_pool}, ecpool:with_client(?POOL, 42, fun(_) -> ok end)). t_start_sup_pool(_Config) -> {ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS), @@ -152,6 +172,35 @@ t_start_sup_pool(_Config) -> ecpool:stop_sup_pool(xpool), ?assertEqual([], ecpool_sup:pools()). +t_start_sup_pool_timeout(_Config) -> + ShutdownTimeout = 2_000, %% the `shutdown` option in ecpool_worker_sup:init/1 + Size = 2, + Opts = [{pool_size, Size} | proplists:delete(pool_size, ?POOL_OPTS)], + spawn_link(fun() -> + ?assertMatch({error, {worker_exit, killed}}, + ecpool:start_sup_pool(timeout_pool, test_timeout_client, Opts)) + end), + timer:sleep(200), + {Time, Val} = timer:tc(ecpool, stop_sup_pool, [timeout_pool]), + ?assert(Time / 1_000 < (ShutdownTimeout * Size + 1_000), + #{time => Time / 1_000, shutdown_timeout => ShutdownTimeout, size => Size}), + ?assertMatchOneOf(ok, {error,not_found}, Val), + ?assertEqual([], ecpool_sup:pools()). + +t_start_sup_duplicated(_Config) -> + Opts = [{pool_size, 1} | proplists:delete(pool_size, ?POOL_OPTS)], + spawn_link(fun() -> + ?assertMatch({error, {worker_exit, killed}}, + ecpool:start_sup_pool(dup_pool, test_timeout_client, Opts)) + end), + timer:sleep(200), + ?assertMatch({error, {already_started, _}}, + ecpool:start_sup_pool(dup_pool, test_timeout_client, Opts)), + ?assertMatch({ok, _}, + ecpool:start_sup_pool(another_pool, test_client, Opts)), + ecpool:stop_sup_pool(dup_pool), + ecpool:stop_sup_pool(another_pool). + t_restart_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), Workers1 = ecpool:workers(?POOL), diff --git a/test/test_timeout_client.erl b/test/test_timeout_client.erl new file mode 100644 index 00000000..05448956 --- /dev/null +++ b/test/test_timeout_client.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(test_timeout_client). + +-behaviour(ecpool_worker). + +-export([connect/1]). + +connect(Options) -> + Delay = proplists:get_value(delay, Options, 120000), + timer:sleep(Delay), + {ok, erlang:spawn_link(fun() -> ok end)}. From 785aa384cfdceeb11be3ccecb7e9cf4096dae978 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:10:15 -0300 Subject: [PATCH 11/11] fix: fix bad merge resolution --- src/ecpool_monitor.erl | 6 ------ src/ecpool_worker.erl | 11 ----------- 2 files changed, 17 deletions(-) diff --git a/src/ecpool_monitor.erl b/src/ecpool_monitor.erl index ab07106c..8807ca46 100644 --- a/src/ecpool_monitor.erl +++ b/src/ecpool_monitor.erl @@ -26,7 +26,6 @@ ]). -export([ update_clients_global/0 - , upgrade_clients_state/0 , reg_worker/0 , reg_worker/1 , get_all_global_clients/0 @@ -76,11 +75,6 @@ monitor_spec() -> update_clients_global() -> with_all_workers(fun put_client_global_and_start_monitor/2). -upgrade_clients_state() -> - with_all_workers(fun(_, WorkerPid) -> - ecpool_worker:upgrade_state(WorkerPid) - end). - get_all_global_clients() -> ets:tab2list(?DISCOVERY_TAB). diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 30aaeb8b..59a9d3c1 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -20,8 +20,6 @@ -export([start_link/4]). --export([upgrade_state/1]). - %% API Function Exports -export([ client/1 , exec/3 @@ -62,7 +60,6 @@ }). -type start_result() :: not_started | started | {start_failed, term()} | {exit, term()} | undefined. - -define(set_start_result(RESULT), erlang:put(start_result, RESULT)). -define(take_start_result(), erlang:erase(start_result)). @@ -73,11 +70,6 @@ -callback(connect(ConnOpts :: list()) -> {ok, pid()} | {ok, {pid(), pid()}, map()} | {error, Reason :: term()}). --type start_result() :: not_started | started | {start_failed, term()} | {exit, term()} | undefined. - --define(set_start_result(RESULT), erlang:put(start_result, RESULT)). --define(take_start_result(), erlang:erase(start_result)). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -147,9 +139,6 @@ get_reconnect_callbacks(Pid) -> add_disconnect_callback(Pid, OnDisconnect) -> gen_server:cast(Pid, {add_disconn_callbk, OnDisconnect}). -upgrade_state(Pid) -> - gen_server:cast(Pid, upgrade_state). - %%-------------------------------------------------------------------- %% gen_server callbacks %%--------------------------------------------------------------------