diff --git a/op-conductor/conductor/config.go b/op-conductor/conductor/config.go index e9a90ecc75ef8..051dfe16fd48b 100644 --- a/op-conductor/conductor/config.go +++ b/op-conductor/conductor/config.go @@ -109,6 +109,12 @@ type Config struct { // X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server. HTTPBodyLimitMB int + + // X Layer: RoundRobinLeaderTransfer enables deterministic round-robin leader transfer. + RoundRobinLeaderTransfer bool + + // X Layer: RaftNoShutdownOnRemove prevents Raft from shutting down when removed from cluster. + RaftNoShutdownOnRemove bool } // Check validates the CLIConfig. @@ -225,6 +231,10 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { // X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server. HTTPBodyLimitMB: ctx.Int(flags.HTTPBodyLimitMB.Name), + // X Layer: RoundRobinLeaderTransfer enables deterministic round-robin leader transfer. + RoundRobinLeaderTransfer: ctx.Bool(flags.RoundRobinLeaderTransfer.Name), + // X Layer: RaftNoShutdownOnRemove prevents Raft from shutting down when removed. + RaftNoShutdownOnRemove: ctx.Bool(flags.RaftNoShutdownOnRemove.Name), }, nil } diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index 4f0013230e156..0e88c7ca52528 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "net/http" + "sort" "strings" "sync" "sync/atomic" @@ -187,6 +188,8 @@ func (c *OpConductor) initConsensus(ctx context.Context) error { TrailingLogs: c.cfg.RaftTrailingLogs, HeartbeatTimeout: c.cfg.RaftHeartbeatTimeout, LeaderLeaseTimeout: c.cfg.RaftLeaderLeaseTimeout, + // X Layer: Optionally don't shutdown when removed from cluster. + NoShutdownOnRemove: c.cfg.RaftNoShutdownOnRemove, } cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig) if err != nil { @@ -827,7 +830,15 @@ func (oc *OpConductor) action() { func (oc *OpConductor) transferLeader() error { // TransferLeader here will do round robin to try to transfer leadership to the next healthy node. oc.log.Info("transferring leadership", "server", oc.cons.ServerID()) - err := oc.cons.TransferLeader() + + // X Layer: Use round-robin if enabled, otherwise use default Raft leader transfer + var err error + if oc.cfg.RoundRobinLeaderTransfer { + err = oc.transferLeaderRoundRobin() + } else { + err = oc.cons.TransferLeader() + } + oc.metrics.RecordLeaderTransfer(err == nil) if err == nil { oc.leader.Store(false) @@ -845,6 +856,95 @@ func (oc *OpConductor) transferLeader() error { } } +// transferLeaderRoundRobin implements true round-robin leader transfer. +// X Layer: This ensures that each cluster member gets a chance to become leader, +// by always transferring to the next node in sorted order. +// For example, with nodes [seq1, seq2, seq3]: +// - seq1 transfers to seq2 +// - seq2 transfers to seq3 +// - seq3 transfers to seq1 +// +// If a transfer fails (e.g., target node's log is behind), it will try the next node. +func (oc *OpConductor) transferLeaderRoundRobin() error { + // Get cluster membership + membership, err := oc.cons.ClusterMembership() + if err != nil { + return fmt.Errorf("failed to get cluster membership: %w", err) + } + + myServerID := oc.cons.ServerID() + + // Collect all voters (including self) for proper ordering + var allVoters []consensus.ServerInfo + for _, server := range membership.Servers { + if server.Suffrage == consensus.Voter { + allVoters = append(allVoters, server) + } + } + + if len(allVoters) <= 1 { + return errors.New("no other voters available for leader transfer") + } + + // Sort all voters by ServerID to ensure consistent ordering across all nodes. + sort.Slice(allVoters, func(i, j int) bool { + return allVoters[i].ID < allVoters[j].ID + }) + + // Find my position in the sorted list + myIdx := -1 + for i, server := range allVoters { + if server.ID == myServerID { + myIdx = i + break + } + } + + if myIdx == -1 { + return errors.New("current server not found in voter list") + } + + // Try to transfer to each voter in round-robin order, starting from the next one. + // This handles cases where a recently promoted voter may have stale logs. + numOtherVoters := len(allVoters) - 1 + for attempt := 0; attempt < numOtherVoters; attempt++ { + targetIdx := (myIdx + 1 + attempt) % len(allVoters) + // Skip self + if targetIdx == myIdx { + continue + } + target := allVoters[targetIdx] + + oc.log.Info("round-robin transferring leadership", + "from", myServerID, + "to", target.ID, + "targetAddr", target.Addr, + "attempt", attempt+1, + "totalVoters", len(allVoters), + ) + + err := oc.cons.TransferLeaderTo(target.ID, target.Addr) + if err == nil { + return nil // Success + } + + // ErrLeadershipTransferInProgress means a previous transfer is ongoing, just wait + if errors.Is(err, raft.ErrLeadershipTransferInProgress) { + oc.log.Debug("leadership transfer already in progress, waiting for completion") + return nil + } + + // Log the failure and try next voter + oc.log.Warn("failed to transfer leadership to voter, trying next", + "target", target.ID, + "err", err, + "attempt", attempt+1, + ) + } + + return errors.New("failed to transfer leadership to any voter") +} + func (oc *OpConductor) stopSequencer() error { oc.log.Info( "stopping sequencer", diff --git a/op-conductor/consensus/raft.go b/op-conductor/consensus/raft.go index 601330e2fcf26..354ce4462c6b4 100644 --- a/op-conductor/consensus/raft.go +++ b/op-conductor/consensus/raft.go @@ -61,6 +61,9 @@ type RaftConsensusConfig struct { TrailingLogs uint64 HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration + + // X Layer: NoShutdownOnRemove prevents Raft from shutting down when removed from cluster. + NoShutdownOnRemove bool } // checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails. @@ -82,6 +85,11 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, rc.HeartbeatTimeout = cfg.HeartbeatTimeout rc.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout rc.LocalID = raft.ServerID(cfg.ServerID) + // X Layer: Optionally don't shutdown when removed from cluster, just become a follower. + // This allows the node to be re-added to the cluster without restarting the process. + if cfg.NoShutdownOnRemove { + rc.ShutdownOnRemove = false + } baseDir := filepath.Join(cfg.StorageDir, cfg.ServerID) if _, err := os.Stat(baseDir); os.IsNotExist(err) { diff --git a/op-conductor/flags/flags.go b/op-conductor/flags/flags.go index ba3b89dcef475..b01f1c451615a 100644 --- a/op-conductor/flags/flags.go +++ b/op-conductor/flags/flags.go @@ -209,6 +209,27 @@ var ( EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RPC_HTTP_BODY_LIMIT_MB"), Value: 5, } + + // X Layer: RoundRobinLeaderTransfer enables deterministic round-robin leader transfer. + // When enabled, leader transfer will cycle through all voters in sorted order (by ServerID), + // ensuring that even if only one node in the cluster is healthy, it will eventually become leader. + // This is useful when Raft's default log-based leader selection keeps choosing unhealthy nodes. + RoundRobinLeaderTransfer = &cli.BoolFlag{ + Name: "raft.round-robin-leader-transfer", + Usage: "Enable deterministic round-robin leader transfer instead of Raft's default log-based selection", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_ROUND_ROBIN_LEADER_TRANSFER"), + Value: false, + } + + // X Layer: RaftNoShutdownOnRemove prevents Raft from shutting down when removed from cluster. + // When enabled, the node will transition to follower state instead of shutting down, + // allowing it to be re-added to the cluster without restarting the process. + RaftNoShutdownOnRemove = &cli.BoolFlag{ + Name: "raft.no-shutdown-on-remove", + Usage: "Don't shutdown Raft when removed from cluster, just become a follower (allows re-adding without restart)", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_NO_SHUTDOWN_ON_REMOVE"), + Value: false, + } ) var requiredFlags = []cli.Flag{ @@ -246,7 +267,9 @@ var optionalFlags = []cli.Flag{ HealthcheckExecutionP2pCheckApi, HealthCheckRollupBoostPartialHealthinessToleranceLimit, HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds, - HTTPBodyLimitMB, // X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server. + HTTPBodyLimitMB, // X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server. + RoundRobinLeaderTransfer, // X Layer: Enable round-robin leader transfer + RaftNoShutdownOnRemove, // X Layer: Don't shutdown on remove } func init() {