From b68607ad9356bb14f419e5989ef4aa1ce636af82 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Tue, 18 Nov 2025 23:42:44 -0500 Subject: [PATCH] Fix race condition causing stale PIDs in remote lookups sync_register/sync_join messages from multicast_loop can arrive before ack_sync from gen_server since they're different senders (no ordering guarantee). When this happens, the message was dropped because the remote node wasn't in nodes_map yet, leaving stale data from ack_sync. Fix: Include RemoteScopePid in broadcasts to allow inline discovery when sync arrives before ack_sync. Old message format still supported for rolling upgrades. --- src/syn_pg.erl | 25 ++++++++++++++++++++++--- src/syn_registry.erl | 27 +++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/syn_pg.erl b/src/syn_pg.erl index 66b5a169..57375441 100644 --- a/src/syn_pg.erl +++ b/src/syn_pg.erl @@ -419,13 +419,32 @@ handle_call(Request, From, #state{scope = Scope} = State) -> {noreply, #state{}} | {noreply, #state{}, timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), #state{}}. +%% New format with RemoteScopePid - allows inline discovery +handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) -> + RemoteNode = node(Pid), + case maps:is_key(RemoteNode, NodesMap) of + true -> + handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State), + {noreply, State}; + + false -> + %% Node not in nodes_map yet - sync arrived before ack_sync due to + %% different sender processes (multicast_loop vs gen_server). + %% Inline the discovery: set up monitor and add to nodes_map. + _MRef = monitor(process, RemoteScopePid), + NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid}, + handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}), + {noreply, State#state{nodes_map = NodesMap1}} + end; + +%% Old format for backwards compatibility (rolling upgrades) handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) -> case maps:is_key(node(Pid), NodesMap) of true -> handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State); false -> - %% ignore, race condition + %% ignore, cannot inline discover without RemoteScopePid ok end, {noreply, State}; @@ -573,8 +592,8 @@ do_join_on_node(GroupName, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode, %% callback syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]), syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, PreviousMeta, Meta, normal]), - %% broadcast - syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State), + %% broadcast (include self() so receiver can set up monitor if needed) + syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, self()}, [RequesterNode], State), %% return {reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}. diff --git a/src/syn_registry.erl b/src/syn_registry.erl index 774e750a..b090d25f 100755 --- a/src/syn_registry.erl +++ b/src/syn_registry.erl @@ -314,13 +314,32 @@ handle_call(Request, From, #state{scope = Scope} = State) -> {noreply, #state{}} | {noreply, #state{}, timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), #state{}}. +%% New format with RemoteScopePid - allows inline discovery +handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) -> + RemoteNode = node(Pid), + case maps:is_key(RemoteNode, NodesMap) of + true -> + handle_registry_sync(Name, Pid, Meta, Time, Reason, State), + {noreply, State}; + + false -> + %% Node not in nodes_map yet - sync arrived before ack_sync due to + %% different sender processes (multicast_loop vs gen_server). + %% Inline the discovery: set up monitor and add to nodes_map. + _MRef = monitor(process, RemoteScopePid), + NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid}, + handle_registry_sync(Name, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}), + {noreply, State#state{nodes_map = NodesMap1}} + end; + +%% Old format for backwards compatibility (rolling upgrades) handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) -> case maps:is_key(node(Pid), NodesMap) of true -> handle_registry_sync(Name, Pid, Meta, Time, Reason, State); false -> - %% ignore, race condition + %% ignore, cannot inline discover without RemoteScopePid ok end, {noreply, State}; @@ -467,8 +486,8 @@ do_register_on_node(Name, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode, %% callback syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, Reason]), syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, PreviousMeta, Meta, normal]), - %% broadcast - syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, [RequesterNode], State), + %% broadcast (include self() so receiver can set up monitor if needed) + syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason, self()}, [RequesterNode], State), %% return {reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}. @@ -690,7 +709,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime ResolveTime = erlang:system_time(), add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid), %% broadcast to all (including remote node to update the time) - syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution}, State); + syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution, self()}, State); Invalid -> error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> none chosen (got: ~p)",