diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 276309f9..ea113d3e 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -31,4 +31,3 @@ jobs: run: make cover - name: Run dialyzer run: make dialyzer - diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index c871f73d..f3c6c290 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -1,9 +1,24 @@ %% -*-: erlang -*- -{"0.5.10", +{"0.5.12", [ + {"0.5.11", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} + ]}, + {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} + ]}, {"0.5.9", [ - {load_module, ecpool_worker, brutal_purge, soft_purge, []} + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, + %% e4.4.26 uses ecpool 0.5.5, while e4.4.27 uses 0.5.11 {<<"0\\.5\\.[3-8]">>, [ %% NOTE: MUST start ecpool_monitor before any load_module instructions {add_module, ecpool_monitor}, @@ -12,6 +27,8 @@ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {apply, {ecpool_monitor, upgrade_clients_state, []}}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[0-2]">>, [ @@ -21,6 +38,7 @@ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, @@ -29,6 +47,7 @@ {apply, {ecpool_monitor, ensure_monitor_started, []}}, {apply, {ecpool_monitor, update_clients_global, []}}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, @@ -36,15 +55,31 @@ ]} ], [ + {"0.5.11", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} + ]}, + {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} + ]}, {"0.5.9", [ - {load_module, ecpool_worker, brutal_purge, soft_purge, []} + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker, brutal_purge, soft_purge, []}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[3-8]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, - {apply, {ecpool_monitor, ensure_monitor_stopped, []}} + {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, + {delete_module,ecpool_monitor} ]}, {<<"0\\.5\\.[0-2]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, @@ -52,7 +87,8 @@ {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, - {apply, {ecpool_monitor, ensure_monitor_stopped, []}} + {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {delete_module,ecpool_monitor} ]}, {"0.4.2", [ {load_module, ecpool_worker, brutal_purge, soft_purge, []}, @@ -60,7 +96,8 @@ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, - {apply, {ecpool_monitor, ensure_monitor_stopped, []}} + {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, + {delete_module,ecpool_monitor} ]} ] }. diff --git a/src/ecpool.erl b/src/ecpool.erl index 8abad079..a0298b79 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -67,6 +67,11 @@ | {on_disconnect, conn_callback()} | tuple(). -type get_client_ret() :: pid() | false | no_such_pool. +-type start_error() :: no_worker_sup + | worker_not_started + | {worker_start_failed, term()} + | {worker_exit, term()} + | supervisor:startlink_err(). -define(IS_ACTION(ACTION), ((is_tuple(ACTION) andalso tuple_size(ACTION) == 3) orelse is_function(ACTION, 1))). @@ -79,51 +84,27 @@ pool_spec(ChildId, Pool, Mod, Opts) -> modules => [ecpool_pool_sup]}. %% @doc Start the pool sup. --spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, term()}). +-spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}). start_pool(Pool, Mod, Opts) -> - %% See start_sup_pool/3 for an explanation of StartProcessAliasID - StartProcessAliasID = erlang:alias(), - try - {ok, Pid} = OkResponse = ecpool_pool_sup:start_link(Pool, - Mod, - Opts, - StartProcessAliasID), - case aggregate_initial_connect_responses(StartProcessAliasID, Opts) of - ok -> - OkResponse; - Error -> - unlink(Pid), - exit(Pid, shutdown), - Error - end - after - erlang:unalias(StartProcessAliasID), - flush_initial_connect_response_messages(StartProcessAliasID) + case ecpool_pool_sup:start_link(Pool, Mod, Opts) of + {ok, Pid} -> + wait_for_workers_started(Pid, fun() -> + gen_server:stop(Pid) + end); + {error, Reason} -> + {error, Reason} end. %% @doc Start the pool supervised by ecpool_sup +-spec(start_sup_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}). start_sup_pool(Pool, Mod, Opts) -> - %% To avoid confusing OTP crash report log entries, supervisors and the - %% gen_server ecpool_worker should never return a non-ok response from the - %% init function. Instead, to handle errors from the initial connect - %% attempt, we pass our process alias ID to the ecpool_worker so the workers - %% can send back the errors. - StartProcessAliasID = erlang:alias(), - try - OkResponse = ecpool_sup:start_pool(Pool, - Mod, - Opts, - StartProcessAliasID), - case aggregate_initial_connect_responses(StartProcessAliasID, Opts) of - ok -> - OkResponse; - Error -> - _ = stop_sup_pool(Pool), - Error - end - after - erlang:unalias(StartProcessAliasID), - flush_initial_connect_response_messages(StartProcessAliasID) + case ecpool_sup:start_pool(Pool, Mod, Opts) of + {ok, Pid} -> + wait_for_workers_started(Pid, fun() -> + stop_sup_pool(Pool) + end); + {error, Reason} -> + {error, Reason} end. %% @doc Stop the pool supervised by ecpool_sup @@ -225,31 +206,29 @@ exec(Action, Client) when is_function(Action) -> %% Internal functions -aggregate_initial_connect_responses(StartProcessAliasID, - Opts) -> - PoolSize = ecpool_worker_sup:pool_size(Opts), - aggregate_initial_connect_responses_helper(PoolSize, - StartProcessAliasID). - -aggregate_initial_connect_responses_helper(0 = _RespLeft, _RespRef) -> - %% We got an ok response from all workers - ok; -aggregate_initial_connect_responses_helper(RespLeft, RespRef) -> - receive - {Ref, ok} when Ref =:= RespRef -> - aggregate_initial_connect_responses_helper(RespLeft - 1, RespRef); - {Ref, NonOkResp} when Ref =:= RespRef -> - NonOkResp +wait_for_workers_started(Pid, Clearer) -> + case lists:keyfind(worker_sup, 1, supervisor:which_children(Pid)) of + {worker_sup, WorkerSupPid, supervisor, _} -> + case check_worker_start_results(supervisor:which_children(WorkerSupPid)) of + ok -> {ok, Pid}; + Error -> + _ = Clearer(), + Error + end; + false -> + _ = Clearer(), + {error, no_worker_sup} end. -%% Called after the receiver process has been unaliased itself to make sure we -%% remove all messages sent to the alias from the process inbox -flush_initial_connect_response_messages(RespRef) -> - receive - {Ref, _Resp} when Ref =:= RespRef -> - flush_initial_connect_response_messages(RespRef) - after 0 -> - ok +check_worker_start_results([]) -> + ok; +check_worker_start_results([{_, WorkerPid, worker, _} | Workers]) -> + %% NOTE: `ecpool_worker:take_start_result/1` is an infinity call which will block the caller + %% if the worker is busy on starting. + case ecpool_worker:take_start_result(WorkerPid) of + not_started -> {error, worker_not_started}; + {start_failed, Error} -> {error, {worker_start_failed, Error}}; + {exit, Reason} -> {error, {worker_exit, Reason}}; + started -> check_worker_start_results(Workers); + undefined -> check_worker_start_results(Workers) end. - - diff --git a/src/ecpool_monitor.erl b/src/ecpool_monitor.erl index 0fbf36af..8807ca46 100644 --- a/src/ecpool_monitor.erl +++ b/src/ecpool_monitor.erl @@ -27,6 +27,7 @@ -export([ update_clients_global/0 , reg_worker/0 + , reg_worker/1 , get_all_global_clients/0 , put_client_global/1 , put_client_global/2 @@ -72,14 +73,7 @@ monitor_spec() -> }. update_clients_global() -> - lists:foreach(fun({PoolName, _}) -> - lists:foreach(fun({_, WrokerPid}) -> - case ecpool_worker:client(WrokerPid) of - {ok, Client} -> put_client_global(WrokerPid, Client); - _ -> ok - end - end, ecpool:workers(PoolName)) - end, ecpool_sup:pools()). + with_all_workers(fun put_client_global_and_start_monitor/2). get_all_global_clients() -> ets:tab2list(?DISCOVERY_TAB). @@ -87,8 +81,8 @@ get_all_global_clients() -> put_client_global(Client) -> put_client_global(self(), Client). -put_client_global(WrokerPid, Client) -> - ets:insert(?DISCOVERY_TAB, {WrokerPid, Client}), +put_client_global(WorkerPid, Client) -> + ets:insert(?DISCOVERY_TAB, {WorkerPid, Client}), ok. get_client_global(WorkerPid) -> @@ -102,7 +96,10 @@ get_client_global(WorkerPid) -> end. reg_worker() -> - gen_server:call(?MODULE, {reg_worker, self()}, infinity). + reg_worker(self()). + +reg_worker(WorkerPid) -> + gen_server:call(?MODULE, {reg_worker, WorkerPid}, infinity). %%============================================================================== @@ -143,4 +140,17 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== - +with_all_workers(Fun) -> + lists:foreach(fun({PoolName, _}) -> + lists:foreach(fun({_, WorkerPid}) -> + Fun(PoolName, WorkerPid) + end, ecpool:workers(PoolName)) + end, ecpool_sup:pools()). + +put_client_global_and_start_monitor(_, WorkerPid) -> + case ecpool_worker:client(WorkerPid) of + {ok, Client} -> + ok = reg_worker(WorkerPid), + put_client_global(WorkerPid, Client); + _ -> ok + end. diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl index 0314c96c..ab83c0c4 100644 --- a/src/ecpool_pool_sup.erl +++ b/src/ecpool_pool_sup.erl @@ -19,23 +19,22 @@ -behaviour(supervisor). %% API --export([start_link/4]). +-export([start_link/3]). %% Supervisor callbacks -export([init/1]). -start_link(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> - supervisor:start_link(?MODULE, [Pool, Mod, Opts, InitialConnectResultReceiverAlias]). +start_link(Pool, Mod, Opts) -> + supervisor:start_link(?MODULE, [Pool, Mod, Opts]). -init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) -> +init([Pool, Mod, Opts]) -> {ok, { {one_for_all, 10, 100}, [ {pool, {ecpool_pool, start_link, [Pool, Opts]}, transient, 16#ffff, worker, [ecpool_pool]}, {worker_sup, {ecpool_worker_sup,start_link, - [Pool, Mod, Opts, InitialConnectResultReceiverAlias]}, + [Pool, Mod, Opts]}, transient, infinity, supervisor, [ecpool_worker_sup]}] }}. - diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index b16f64ba..80bf71f9 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -21,7 +21,7 @@ -export([start_link/0]). %% API --export([ start_pool/4 +-export([ start_pool/3 , stop_pool/1 , get_pool/1 ]). @@ -43,10 +43,9 @@ start_link() -> %%-------------------------------------------------------------------- %% @doc Start a pool. --spec(start_pool(pool_name(), atom(), list(tuple()), reference()) -> {ok, pid()} | {error, term()}). -start_pool(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> - supervisor:start_child(?MODULE, - pool_spec(Pool, Mod, Opts, InitialConnectResultReceiverAlias)). +-spec(start_pool(pool_name(), atom(), [term()]) -> {ok, pid()} | {error, term()}). +start_pool(Pool, Mod, Opts) -> + supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)). %% @doc Stop a pool. -spec(stop_pool(Pool :: pool_name()) -> ok | {error, term()}). @@ -81,15 +80,14 @@ pools() -> init([]) -> {ok, {{one_for_one, 10, 100}, [ecpool_monitor:monitor_spec()]}}. -pool_spec(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> +pool_spec(Pool, Mod, Opts) -> #{id => child_id(Pool), start => {ecpool_pool_sup, start_link, - [Pool, Mod, Opts, InitialConnectResultReceiverAlias]}, + [Pool, Mod, Opts]}, restart => transient, shutdown => infinity, type => supervisor, modules => [ecpool_pool_sup]}. child_id(Pool) -> {pool_sup, Pool}. - diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index a507e10e..59a9d3c1 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([start_link/5]). +-export([start_link/4]). %% API Function Exports -export([ client/1 @@ -26,6 +26,7 @@ , exec_async/2 , exec_async/3 , is_connected/1 + , take_start_result/1 , set_reconnect_callback/2 , set_disconnect_callback/2 , add_reconnect_callback/2 @@ -37,6 +38,7 @@ %% gen_server Function Exports -export([ init/1 + , handle_continue/2 , handle_call/3 , handle_cast/2 , handle_info/2 @@ -57,6 +59,10 @@ opts :: proplists:proplist() }). +-type start_result() :: not_started | started | {start_failed, term()} | {exit, term()} | undefined. +-define(set_start_result(RESULT), erlang:put(start_result, RESULT)). +-define(take_start_result(), erlang:erase(start_result)). + %%-------------------------------------------------------------------- %% Callback %%-------------------------------------------------------------------- @@ -69,12 +75,11 @@ %%-------------------------------------------------------------------- %% @doc Start a pool worker. --spec(start_link(pool_name(), pos_integer(), module(), list(), {pid(), reference()}) -> - {ok, pid()} | ignore | {error, any()}). -start_link(Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias) -> - gen_server:start_link(?MODULE, - [Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias], - []). +-spec(start_link(pool_name(), pos_integer(), module(), list()) -> + {ok, pid()}). +start_link(Pool, Id, Mod, Opts) -> + gen_server:start_link(?MODULE, [Pool, Id, Mod, Opts], []). + %% @doc Get client/connection. -spec(client(pid()) -> {ok, Client :: pid()} | {ok, {Client :: pid(), pid()}} | {error, Reason :: term()}). @@ -98,6 +103,14 @@ exec_async(Pid, Action, Callback) -> is_connected(Pid) -> gen_server:call(Pid, is_connected, infinity). +-spec take_start_result(pid()) -> start_result(). +take_start_result(Pid) -> + try gen_server:call(Pid, take_start_result, infinity) + catch + exit:{{shutdown, Reason}, _} -> {exit, Reason}; + exit:{Reason, _} -> {exit, Reason} + end. + -spec(set_reconnect_callback(pid(), ecpool:conn_callback()) -> ok). set_reconnect_callback(Pid, OnReconnect) -> gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). @@ -130,7 +143,7 @@ add_disconnect_callback(Pid, OnDisconnect) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias]) -> +init([Pool, Id, Mod, Opts]) -> ecpool_monitor:reg_worker(), process_flag(trap_exit, true), State = #state{pool = Pool, @@ -140,53 +153,52 @@ init([Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias]) -> on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)), on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts)) }, + ?set_start_result(not_started), + {ok, State, {continue, connect}}. + +handle_continue(connect, State) -> + #state{pool = Pool, + id = Id + } = State, case connect_internal(State) of {ok, NewState} -> gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), - send_initial_connect_response(InitialConnectResultReceiverAlias, ok), - {ok, NewState}; - Error -> - send_initial_connect_response(InitialConnectResultReceiverAlias, Error), - ignore + ?set_start_result(started), + {noreply, NewState}; + {error, Error} -> + ?set_start_result({start_failed, Error}), + {noreply, State} end. +handle_call(take_start_result, _From, State) -> + {reply, ?take_start_result(), State}; handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) -> IsAlive = Client =/= undefined andalso is_process_alive(Client), {reply, IsAlive, State}; - handle_call(is_connected, _From, State = #state{client = Client}) -> {reply, Client =/= undefined, State}; - handle_call(client, _From, State = #state{client = undefined}) -> {reply, {error, disconnected}, State}; - handle_call(client, _From, State = #state{client = Client}) -> {reply, {ok, Client}, State}; - handle_call({exec, Action}, _From, State = #state{client = Client}) -> - {reply, safe_exec(Action, Client), State}; - + {reply, with_client(Action, Client), State}; handle_call(get_reconnect_callbacks, _From, #state{on_reconnect = OnReconnect} = State) -> {reply, OnReconnect, State}; - handle_call(Req, _From, State) -> logger:error("[PoolWorker] unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({exec_async, Action}, State = #state{client = Client}) -> - _ = safe_exec(Action, Client), + _ = with_client(Action, Client), {noreply, State}; - handle_cast({exec_async, Action, Callback}, State = #state{client = Client}) -> - _ = safe_exec(Callback, safe_exec(Action, Client)), + _ = with_client(Callback, with_client(Action, Client)), {noreply, State}; - handle_cast({set_reconn_callbk, OnReconnect}, State) -> {noreply, State#state{on_reconnect = ensure_callback(OnReconnect)}}; - handle_cast({set_disconn_callbk, OnDisconnect}, State) -> {noreply, State#state{on_disconnect = ensure_callback(OnDisconnect)}}; - handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect0}) -> OldOnReconnect = case reconnect_callback_signature(OnReconnect) of @@ -196,17 +208,13 @@ handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldO OldOnReconnect0 end, {noreply, State#state{on_reconnect = add_conn_callback(OnReconnect, OldOnReconnect)}}; - handle_cast({remove_reconnect_callback_by_signature, CallbackSignature}, State = #state{on_reconnect = OnReconnect0}) -> OldOnReconnect = drop_reconnect_callbacks_by_signature(OnReconnect0, CallbackSignature), {noreply, State#state{on_reconnect = OldOnReconnect}}; - handle_cast({remove_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) -> {noreply, State#state{on_reconnect = remove_conn_callback(OnReconnect, OldOnReconnect)}}; - handle_cast({add_disconn_callbk, OnDisconnect}, State = #state{on_disconnect = OldOnDisconnect}) -> {noreply, State#state{on_disconnect = add_conn_callback(OnDisconnect, OldOnDisconnect)}}; - handle_cast(_Msg, State) -> {noreply, State}. @@ -222,7 +230,6 @@ handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = Sup [?MODULE, Reason, Pid, SupPids]), {noreply, State} end; - handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) -> case connect_internal(State) of {ok, NewState = #state{client = Client}} -> @@ -231,7 +238,6 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) {Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' -> reconnect(proplists:get_value(auto_reconnect, Opts), State) end; - handle_info(Info, State) -> logger:error("[PoolWorker] unexpected info: ~p", [Info]), {noreply, State}. @@ -249,9 +255,14 @@ terminate(_Reason, #state{pool = Pool, id = Id, %% workers to be killed, the total time spend by the ecpool_worker_sup will be %% (0.3 * NumOfSupervisees * NumOfWorkers) seconds. stop_supervisees(SupPids, 300), - %% Ignore the exeception thrown by gproc_pool:disconnect_worker if the name - %% is not registered - catch gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). + try + gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}) + catch + error:badarg -> + %% Ignore the `{badarg,[{gproc,unreg,[...]}]` error as it just means the pool worker + %% has not connected. + ok + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -286,13 +297,13 @@ handle_reconnect(undefined, _) -> ok; handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) -> %% reverse apply the callbacks because the newer ones are cons-ed to the head of the list - lists:foreach(fun(OnReconnectCallback) -> safe_exec(OnReconnectCallback, Client) end, + lists:foreach(fun(OnReconnectCallback) -> with_client(OnReconnectCallback, Client) end, lists:reverse(OnReconnectList)). handle_disconnect(undefined, _) -> ok; handle_disconnect(Client, OnDisconnectList) -> - lists:foreach(fun(OnDisconnectCallback) -> safe_exec(OnDisconnectCallback, Client) end, + lists:foreach(fun(OnDisconnectCallback) -> with_client(OnDisconnectCallback, Client) end, OnDisconnectList). connect_internal(State) -> @@ -332,6 +343,13 @@ drop_reconnect_callbacks_by_signature(Callbacks, Signature) -> Pred = fun(CB) -> not is_reconnect_callback_signature_match(CB, Signature) end, lists:filter(Pred, Callbacks). +with_client(_, undefined) -> + {error, ecpool_disconnected}; +with_client(_, {error, _} = Error) -> + Error; +with_client(Action, Client) -> + safe_exec(Action, Client). + safe_exec({_M, _F, _A} = Action, MainArg) -> try exec(Action, MainArg) catch E:R:ST -> @@ -425,7 +443,3 @@ monitor_child(Pid) -> %% that will be handled in shutdown/2. ok end. - -send_initial_connect_response(InitialConnectResultReceiverAlias, Response) -> - InitialConnectResultReceiverAlias ! {InitialConnectResultReceiverAlias, Response}, - ok. diff --git a/src/ecpool_worker_sup.erl b/src/ecpool_worker_sup.erl index 6225523e..e8f6c618 100644 --- a/src/ecpool_worker_sup.erl +++ b/src/ecpool_worker_sup.erl @@ -18,23 +18,23 @@ -behaviour(supervisor). --export([start_link/4]). +-export([start_link/3]). -export([init/1]). -export([pool_size/1]). -start_link(Pool, Mod, Opts, InitialConnectResultReceiverAlias) -> - supervisor:start_link(?MODULE, [Pool, Mod, Opts, InitialConnectResultReceiverAlias]). +start_link(Pool, Mod, Opts) -> + supervisor:start_link(?MODULE, [Pool, Mod, Opts]). -init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) -> +init([Pool, Mod, Opts]) -> WorkerSpec = fun(Id) -> #{id => {worker, Id}, start => {ecpool_worker, start_link, - [Pool, Id, Mod, Opts, InitialConnectResultReceiverAlias]}, + [Pool, Id, Mod, Opts]}, restart => transient, - shutdown => 5000, + shutdown => 2_000, type => worker, modules => [ecpool_worker, Mod]} end, @@ -44,4 +44,3 @@ init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) -> pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Opts, Schedulers). - diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 0727534b..bd310699 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -16,8 +16,8 @@ -module(ecpool_SUITE). --compile(export_all). -compile(nowarn_export_all). +-compile(export_all). -include_lib("eunit/include/eunit.hrl"). @@ -40,6 +40,23 @@ {encoding, utf8} ]). +-define(assertMatchOneOf(Guard1, Guard2, Expr), + begin + ((fun () -> + case (Expr) of + Guard1 -> ok; + Guard2 -> ok; + X__V -> erlang:error( + {assertMatch, + [{module, ?MODULE}, + {line, ?LINE}, + {expression, (??Expr)}, + {pattern, ([??Guard1, ??Guard2])}, + {value, X__V}]}) + end + end)()) + end). + all() -> [{group, all}]. @@ -52,6 +69,8 @@ groups() -> t_start_pool_one_initial_connect_fail, t_start_pool_any_name, t_start_sup_pool, + t_start_sup_pool_timeout, + t_start_sup_duplicated, t_empty_pool, t_empty_hash_pool, t_restart_client, @@ -131,18 +150,19 @@ t_start_pool_any_name(_Config) -> end, lists:seq(1, 10)). t_empty_pool(_Config) -> - ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, random}]), + ?assertMatchOneOf({error, {worker_exit, normal}}, {error, {worker_exit, noproc}}, + ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, random}])), % NOTE: give some time to clients to exit ok = timer:sleep(100), ?assertEqual([], ecpool:workers(?POOL)), - ?assertEqual({error, ecpool_empty}, ecpool:with_client(?POOL, fun(_) -> ok end)). + ?assertEqual({error, no_such_pool}, ecpool:with_client(?POOL, fun(_) -> ok end)). t_empty_hash_pool(_Config) -> ecpool:start_pool(?POOL, test_failing_client, [{pool_size, 4}, {pool_type, hash}]), % NOTE: give some time to clients to exit ok = timer:sleep(100), ?assertEqual([], ecpool:workers(?POOL)), - ?assertEqual({error, ecpool_empty}, ecpool:with_client(?POOL, 42, fun(_) -> ok end)). + ?assertEqual({error, no_such_pool}, ecpool:with_client(?POOL, 42, fun(_) -> ok end)). t_start_sup_pool(_Config) -> {ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS), @@ -152,6 +172,35 @@ t_start_sup_pool(_Config) -> ecpool:stop_sup_pool(xpool), ?assertEqual([], ecpool_sup:pools()). +t_start_sup_pool_timeout(_Config) -> + ShutdownTimeout = 2_000, %% the `shutdown` option in ecpool_worker_sup:init/1 + Size = 2, + Opts = [{pool_size, Size} | proplists:delete(pool_size, ?POOL_OPTS)], + spawn_link(fun() -> + ?assertMatch({error, {worker_exit, killed}}, + ecpool:start_sup_pool(timeout_pool, test_timeout_client, Opts)) + end), + timer:sleep(200), + {Time, Val} = timer:tc(ecpool, stop_sup_pool, [timeout_pool]), + ?assert(Time / 1_000 < (ShutdownTimeout * Size + 1_000), + #{time => Time / 1_000, shutdown_timeout => ShutdownTimeout, size => Size}), + ?assertMatchOneOf(ok, {error,not_found}, Val), + ?assertEqual([], ecpool_sup:pools()). + +t_start_sup_duplicated(_Config) -> + Opts = [{pool_size, 1} | proplists:delete(pool_size, ?POOL_OPTS)], + spawn_link(fun() -> + ?assertMatch({error, {worker_exit, killed}}, + ecpool:start_sup_pool(dup_pool, test_timeout_client, Opts)) + end), + timer:sleep(200), + ?assertMatch({error, {already_started, _}}, + ecpool:start_sup_pool(dup_pool, test_timeout_client, Opts)), + ?assertMatch({ok, _}, + ecpool:start_sup_pool(another_pool, test_client, Opts)), + ecpool:stop_sup_pool(dup_pool), + ecpool:stop_sup_pool(another_pool). + t_restart_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), Workers1 = ecpool:workers(?POOL), diff --git a/test/test_timeout_client.erl b/test/test_timeout_client.erl new file mode 100644 index 00000000..05448956 --- /dev/null +++ b/test/test_timeout_client.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(test_timeout_client). + +-behaviour(ecpool_worker). + +-export([connect/1]). + +connect(Options) -> + Delay = proplists:get_value(delay, Options, 120000), + timer:sleep(Delay), + {ok, erlang:spawn_link(fun() -> ok end)}.