Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions src/syn_pg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,32 @@ handle_call(Request, From, #state{scope = Scope} = State) ->
{noreply, #state{}} |
{noreply, #state{}, timeout() | hibernate | {continue, term()}} |
{stop, Reason :: term(), #state{}}.
%% New format with RemoteScopePid - allows inline discovery
handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) ->
RemoteNode = node(Pid),
case maps:is_key(RemoteNode, NodesMap) of
true ->
handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State),
{noreply, State};

false ->
%% Node not in nodes_map yet - sync arrived before ack_sync due to
%% different sender processes (multicast_loop vs gen_server).
%% Inline the discovery: set up monitor and add to nodes_map.
_MRef = monitor(process, RemoteScopePid),
NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid},
handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}),
{noreply, State#state{nodes_map = NodesMap1}}
end;

%% Old format for backwards compatibility (rolling upgrades)
handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
case maps:is_key(node(Pid), NodesMap) of
true ->
handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);

false ->
%% ignore, race condition
%% ignore, cannot inline discover without RemoteScopePid
ok
end,
{noreply, State};
Expand Down Expand Up @@ -573,8 +592,8 @@ do_join_on_node(GroupName, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode,
%% callback
syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]),
syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, PreviousMeta, Meta, normal]),
%% broadcast
syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State),
%% broadcast (include self() so receiver can set up monitor if needed)
syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, self()}, [RequesterNode], State),
%% return
{reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}.

Expand Down
27 changes: 23 additions & 4 deletions src/syn_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,32 @@ handle_call(Request, From, #state{scope = Scope} = State) ->
{noreply, #state{}} |
{noreply, #state{}, timeout() | hibernate | {continue, term()}} |
{stop, Reason :: term(), #state{}}.
%% New format with RemoteScopePid - allows inline discovery
handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) ->
RemoteNode = node(Pid),
case maps:is_key(RemoteNode, NodesMap) of
true ->
handle_registry_sync(Name, Pid, Meta, Time, Reason, State),
{noreply, State};

false ->
%% Node not in nodes_map yet - sync arrived before ack_sync due to
%% different sender processes (multicast_loop vs gen_server).
%% Inline the discovery: set up monitor and add to nodes_map.
_MRef = monitor(process, RemoteScopePid),
NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid},
handle_registry_sync(Name, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}),
{noreply, State#state{nodes_map = NodesMap1}}
end;

%% Old format for backwards compatibility (rolling upgrades)
handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) ->
case maps:is_key(node(Pid), NodesMap) of
true ->
handle_registry_sync(Name, Pid, Meta, Time, Reason, State);

false ->
%% ignore, race condition
%% ignore, cannot inline discover without RemoteScopePid
ok
end,
{noreply, State};
Expand Down Expand Up @@ -467,8 +486,8 @@ do_register_on_node(Name, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode,
%% callback
syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, Reason]),
syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, PreviousMeta, Meta, normal]),
%% broadcast
syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, [RequesterNode], State),
%% broadcast (include self() so receiver can set up monitor if needed)
syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason, self()}, [RequesterNode], State),
%% return
{reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}.

Expand Down Expand Up @@ -690,7 +709,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime
ResolveTime = erlang:system_time(),
add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid),
%% broadcast to all (including remote node to update the time)
syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution}, State);
syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution, self()}, State);

Invalid ->
error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> none chosen (got: ~p)",
Expand Down