Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ jobs:
run: make cover
- name: Run dialyzer
run: make dialyzer

49 changes: 43 additions & 6 deletions src/ecpool.appup.src
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
%% -*-: erlang -*-
{"0.5.10",
{"0.5.12",
[
{"0.5.11", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
{"0.5.10", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
{"0.5.9", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
%% e4.4.26 uses ecpool 0.5.5, while e4.4.27 uses 0.5.11
{<<"0\\.5\\.[3-8]">>, [
%% NOTE: MUST start ecpool_monitor before any load_module instructions
{add_module, ecpool_monitor},
Expand All @@ -12,6 +27,8 @@
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, upgrade_clients_state, []}},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
{<<"0\\.5\\.[0-2]">>, [
Expand All @@ -21,6 +38,7 @@
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, upgrade_clients_state, []}},
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
Expand All @@ -29,38 +47,57 @@
{apply, {ecpool_monitor, ensure_monitor_started, []}},
{apply, {ecpool_monitor, update_clients_global, []}},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, upgrade_clients_state, []}},
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []}
]}
],
[
{"0.5.11", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
{"0.5.10", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
{"0.5.9", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}
]},
{<<"0\\.5\\.[3-8]">>, [
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, ensure_monitor_stopped, []}}
{apply, {ecpool_monitor, ensure_monitor_stopped, []}},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{delete_module,ecpool_monitor}
]},
{<<"0\\.5\\.[0-2]">>, [
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, ensure_monitor_stopped, []}}
{apply, {ecpool_monitor, ensure_monitor_stopped, []}},
{delete_module,ecpool_monitor}
]},
{"0.4.2", [
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, ensure_monitor_stopped, []}}
{apply, {ecpool_monitor, ensure_monitor_stopped, []}},
{delete_module,ecpool_monitor}
]}
]
}.
109 changes: 44 additions & 65 deletions src/ecpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
| {on_disconnect, conn_callback()}
| tuple().
-type get_client_ret() :: pid() | false | no_such_pool.
-type start_error() :: no_worker_sup
| worker_not_started
| {worker_start_failed, term()}
| {worker_exit, term()}
| supervisor:startlink_err().

-define(IS_ACTION(ACTION), ((is_tuple(ACTION) andalso tuple_size(ACTION) == 3) orelse is_function(ACTION, 1))).

Expand All @@ -79,51 +84,27 @@ pool_spec(ChildId, Pool, Mod, Opts) ->
modules => [ecpool_pool_sup]}.

%% @doc Start the pool sup.
-spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, term()}).
-spec(start_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}).
start_pool(Pool, Mod, Opts) ->
%% See start_sup_pool/3 for an explanation of StartProcessAliasID
StartProcessAliasID = erlang:alias(),
try
{ok, Pid} = OkResponse = ecpool_pool_sup:start_link(Pool,
Mod,
Opts,
StartProcessAliasID),
case aggregate_initial_connect_responses(StartProcessAliasID, Opts) of
ok ->
OkResponse;
Error ->
unlink(Pid),
exit(Pid, shutdown),
Error
end
after
erlang:unalias(StartProcessAliasID),
flush_initial_connect_response_messages(StartProcessAliasID)
case ecpool_pool_sup:start_link(Pool, Mod, Opts) of
{ok, Pid} ->
wait_for_workers_started(Pid, fun() ->
gen_server:stop(Pid)
end);
{error, Reason} ->
{error, Reason}
end.

%% @doc Start the pool supervised by ecpool_sup
-spec(start_sup_pool(pool_name(), atom(), [option()]) -> {ok, pid()} | {error, start_error()}).
start_sup_pool(Pool, Mod, Opts) ->
%% To avoid confusing OTP crash report log entries, supervisors and the
%% gen_server ecpool_worker should never return a non-ok response from the
%% init function. Instead, to handle errors from the initial connect
%% attempt, we pass our process alias ID to the ecpool_worker so the workers
%% can send back the errors.
StartProcessAliasID = erlang:alias(),
try
OkResponse = ecpool_sup:start_pool(Pool,
Mod,
Opts,
StartProcessAliasID),
case aggregate_initial_connect_responses(StartProcessAliasID, Opts) of
ok ->
OkResponse;
Error ->
_ = stop_sup_pool(Pool),
Error
end
after
erlang:unalias(StartProcessAliasID),
flush_initial_connect_response_messages(StartProcessAliasID)
case ecpool_sup:start_pool(Pool, Mod, Opts) of
{ok, Pid} ->
wait_for_workers_started(Pid, fun() ->
stop_sup_pool(Pool)
end);
{error, Reason} ->
{error, Reason}
end.

%% @doc Stop the pool supervised by ecpool_sup
Expand Down Expand Up @@ -225,31 +206,29 @@ exec(Action, Client) when is_function(Action) ->

%% Internal functions

aggregate_initial_connect_responses(StartProcessAliasID,
Opts) ->
PoolSize = ecpool_worker_sup:pool_size(Opts),
aggregate_initial_connect_responses_helper(PoolSize,
StartProcessAliasID).

aggregate_initial_connect_responses_helper(0 = _RespLeft, _RespRef) ->
%% We got an ok response from all workers
ok;
aggregate_initial_connect_responses_helper(RespLeft, RespRef) ->
receive
{Ref, ok} when Ref =:= RespRef ->
aggregate_initial_connect_responses_helper(RespLeft - 1, RespRef);
{Ref, NonOkResp} when Ref =:= RespRef ->
NonOkResp
wait_for_workers_started(Pid, Clearer) ->
case lists:keyfind(worker_sup, 1, supervisor:which_children(Pid)) of
{worker_sup, WorkerSupPid, supervisor, _} ->
case check_worker_start_results(supervisor:which_children(WorkerSupPid)) of
ok -> {ok, Pid};
Error ->
_ = Clearer(),
Error
end;
false ->
_ = Clearer(),
{error, no_worker_sup}
end.

%% Called after the receiver process has been unaliased itself to make sure we
%% remove all messages sent to the alias from the process inbox
flush_initial_connect_response_messages(RespRef) ->
receive
{Ref, _Resp} when Ref =:= RespRef ->
flush_initial_connect_response_messages(RespRef)
after 0 ->
ok
check_worker_start_results([]) ->
ok;
check_worker_start_results([{_, WorkerPid, worker, _} | Workers]) ->
%% NOTE: `ecpool_worker:take_start_result/1` is an infinity call which will block the caller
%% if the worker is busy on starting.
case ecpool_worker:take_start_result(WorkerPid) of
not_started -> {error, worker_not_started};
{start_failed, Error} -> {error, {worker_start_failed, Error}};
{exit, Reason} -> {error, {worker_exit, Reason}};
started -> check_worker_start_results(Workers);
undefined -> check_worker_start_results(Workers)
end.


34 changes: 22 additions & 12 deletions src/ecpool_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

-export([ update_clients_global/0
, reg_worker/0
, reg_worker/1
, get_all_global_clients/0
, put_client_global/1
, put_client_global/2
Expand Down Expand Up @@ -72,23 +73,16 @@ monitor_spec() ->
}.

update_clients_global() ->
lists:foreach(fun({PoolName, _}) ->
lists:foreach(fun({_, WrokerPid}) ->
case ecpool_worker:client(WrokerPid) of
{ok, Client} -> put_client_global(WrokerPid, Client);
_ -> ok
end
end, ecpool:workers(PoolName))
end, ecpool_sup:pools()).
with_all_workers(fun put_client_global_and_start_monitor/2).

get_all_global_clients() ->
ets:tab2list(?DISCOVERY_TAB).

put_client_global(Client) ->
put_client_global(self(), Client).

put_client_global(WrokerPid, Client) ->
ets:insert(?DISCOVERY_TAB, {WrokerPid, Client}),
put_client_global(WorkerPid, Client) ->
ets:insert(?DISCOVERY_TAB, {WorkerPid, Client}),
ok.

get_client_global(WorkerPid) ->
Expand All @@ -102,7 +96,10 @@ get_client_global(WorkerPid) ->
end.

reg_worker() ->
gen_server:call(?MODULE, {reg_worker, self()}, infinity).
reg_worker(self()).

reg_worker(WorkerPid) ->
gen_server:call(?MODULE, {reg_worker, WorkerPid}, infinity).

%%==============================================================================

Expand Down Expand Up @@ -143,4 +140,17 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%==============================================================================

with_all_workers(Fun) ->
lists:foreach(fun({PoolName, _}) ->
lists:foreach(fun({_, WorkerPid}) ->
Fun(PoolName, WorkerPid)
end, ecpool:workers(PoolName))
end, ecpool_sup:pools()).

put_client_global_and_start_monitor(_, WorkerPid) ->
case ecpool_worker:client(WorkerPid) of
{ok, Client} ->
ok = reg_worker(WorkerPid),
put_client_global(WorkerPid, Client);
_ -> ok
end.
11 changes: 5 additions & 6 deletions src/ecpool_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@
-behaviour(supervisor).

%% API
-export([start_link/4]).
-export([start_link/3]).

%% Supervisor callbacks
-export([init/1]).

start_link(Pool, Mod, Opts, InitialConnectResultReceiverAlias) ->
supervisor:start_link(?MODULE, [Pool, Mod, Opts, InitialConnectResultReceiverAlias]).
start_link(Pool, Mod, Opts) ->
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).

init([Pool, Mod, Opts, InitialConnectResultReceiverAlias]) ->
init([Pool, Mod, Opts]) ->
{ok, { {one_for_all, 10, 100}, [
{pool, {ecpool_pool, start_link, [Pool, Opts]},
transient, 16#ffff, worker, [ecpool_pool]},
{worker_sup,
{ecpool_worker_sup,start_link,
[Pool, Mod, Opts, InitialConnectResultReceiverAlias]},
[Pool, Mod, Opts]},
transient,
infinity,
supervisor,
[ecpool_worker_sup]}] }}.

14 changes: 6 additions & 8 deletions src/ecpool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-export([start_link/0]).

%% API
-export([ start_pool/4
-export([ start_pool/3
, stop_pool/1
, get_pool/1
]).
Expand All @@ -43,10 +43,9 @@ start_link() ->
%%--------------------------------------------------------------------

%% @doc Start a pool.
-spec(start_pool(pool_name(), atom(), list(tuple()), reference()) -> {ok, pid()} | {error, term()}).
start_pool(Pool, Mod, Opts, InitialConnectResultReceiverAlias) ->
supervisor:start_child(?MODULE,
pool_spec(Pool, Mod, Opts, InitialConnectResultReceiverAlias)).
-spec(start_pool(pool_name(), atom(), [term()]) -> {ok, pid()} | {error, term()}).
start_pool(Pool, Mod, Opts) ->
supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)).

%% @doc Stop a pool.
-spec(stop_pool(Pool :: pool_name()) -> ok | {error, term()}).
Expand Down Expand Up @@ -81,15 +80,14 @@ pools() ->
init([]) ->
{ok, {{one_for_one, 10, 100}, [ecpool_monitor:monitor_spec()]}}.

pool_spec(Pool, Mod, Opts, InitialConnectResultReceiverAlias) ->
pool_spec(Pool, Mod, Opts) ->
#{id => child_id(Pool),
start => {ecpool_pool_sup,
start_link,
[Pool, Mod, Opts, InitialConnectResultReceiverAlias]},
[Pool, Mod, Opts]},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [ecpool_pool_sup]}.

child_id(Pool) -> {pool_sup, Pool}.

Loading
Loading