Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions op-conductor/conductor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ type Config struct {

// X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server.
HTTPBodyLimitMB int

// X Layer: RoundRobinLeaderTransfer enables deterministic round-robin leader transfer.
RoundRobinLeaderTransfer bool

// X Layer: RaftNoShutdownOnRemove prevents Raft from shutting down when removed from cluster.
RaftNoShutdownOnRemove bool
}

// Check validates the CLIConfig.
Expand Down Expand Up @@ -225,6 +231,10 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {

// X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server.
HTTPBodyLimitMB: ctx.Int(flags.HTTPBodyLimitMB.Name),
// X Layer: RoundRobinLeaderTransfer enables deterministic round-robin leader transfer.
RoundRobinLeaderTransfer: ctx.Bool(flags.RoundRobinLeaderTransfer.Name),
// X Layer: RaftNoShutdownOnRemove prevents Raft from shutting down when removed.
RaftNoShutdownOnRemove: ctx.Bool(flags.RaftNoShutdownOnRemove.Name),
}, nil
}

Expand Down
102 changes: 101 additions & 1 deletion op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"net/http"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -187,6 +188,8 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
TrailingLogs: c.cfg.RaftTrailingLogs,
HeartbeatTimeout: c.cfg.RaftHeartbeatTimeout,
LeaderLeaseTimeout: c.cfg.RaftLeaderLeaseTimeout,
// X Layer: Optionally don't shutdown when removed from cluster.
NoShutdownOnRemove: c.cfg.RaftNoShutdownOnRemove,
}
cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig)
if err != nil {
Expand Down Expand Up @@ -827,7 +830,15 @@ func (oc *OpConductor) action() {
func (oc *OpConductor) transferLeader() error {
// TransferLeader here will do round robin to try to transfer leadership to the next healthy node.
oc.log.Info("transferring leadership", "server", oc.cons.ServerID())
err := oc.cons.TransferLeader()

// X Layer: Use round-robin if enabled, otherwise use default Raft leader transfer
var err error
if oc.cfg.RoundRobinLeaderTransfer {
err = oc.transferLeaderRoundRobin()
} else {
err = oc.cons.TransferLeader()
}

oc.metrics.RecordLeaderTransfer(err == nil)
if err == nil {
oc.leader.Store(false)
Expand All @@ -845,6 +856,95 @@ func (oc *OpConductor) transferLeader() error {
}
}

// transferLeaderRoundRobin implements true round-robin leader transfer.
// X Layer: This ensures that each cluster member gets a chance to become leader,
// by always transferring to the next node in sorted order.
// For example, with nodes [seq1, seq2, seq3]:
// - seq1 transfers to seq2
// - seq2 transfers to seq3
// - seq3 transfers to seq1
//
// If a transfer fails (e.g., target node's log is behind), it will try the next node.
func (oc *OpConductor) transferLeaderRoundRobin() error {
// Get cluster membership
membership, err := oc.cons.ClusterMembership()
if err != nil {
return fmt.Errorf("failed to get cluster membership: %w", err)
}

myServerID := oc.cons.ServerID()

// Collect all voters (including self) for proper ordering
var allVoters []consensus.ServerInfo
for _, server := range membership.Servers {
if server.Suffrage == consensus.Voter {
allVoters = append(allVoters, server)
}
}

if len(allVoters) <= 1 {
return errors.New("no other voters available for leader transfer")
}

// Sort all voters by ServerID to ensure consistent ordering across all nodes.
sort.Slice(allVoters, func(i, j int) bool {
return allVoters[i].ID < allVoters[j].ID
})

// Find my position in the sorted list
myIdx := -1
for i, server := range allVoters {
if server.ID == myServerID {
myIdx = i
break
}
}

if myIdx == -1 {
return errors.New("current server not found in voter list")
}

// Try to transfer to each voter in round-robin order, starting from the next one.
// This handles cases where a recently promoted voter may have stale logs.
numOtherVoters := len(allVoters) - 1
for attempt := 0; attempt < numOtherVoters; attempt++ {
targetIdx := (myIdx + 1 + attempt) % len(allVoters)
// Skip self
if targetIdx == myIdx {
continue
}
target := allVoters[targetIdx]

oc.log.Info("round-robin transferring leadership",
"from", myServerID,
"to", target.ID,
"targetAddr", target.Addr,
"attempt", attempt+1,
"totalVoters", len(allVoters),
)

err := oc.cons.TransferLeaderTo(target.ID, target.Addr)
if err == nil {
return nil // Success
}

// ErrLeadershipTransferInProgress means a previous transfer is ongoing, just wait
if errors.Is(err, raft.ErrLeadershipTransferInProgress) {
oc.log.Debug("leadership transfer already in progress, waiting for completion")
return nil
}

// Log the failure and try next voter
oc.log.Warn("failed to transfer leadership to voter, trying next",
"target", target.ID,
"err", err,
"attempt", attempt+1,
)
}

return errors.New("failed to transfer leadership to any voter")
}

func (oc *OpConductor) stopSequencer() error {
oc.log.Info(
"stopping sequencer",
Expand Down
8 changes: 8 additions & 0 deletions op-conductor/consensus/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type RaftConsensusConfig struct {
TrailingLogs uint64
HeartbeatTimeout time.Duration
LeaderLeaseTimeout time.Duration

// X Layer: NoShutdownOnRemove prevents Raft from shutting down when removed from cluster.
NoShutdownOnRemove bool
}

// checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails.
Expand All @@ -82,6 +85,11 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout
rc.LocalID = raft.ServerID(cfg.ServerID)
// X Layer: Optionally don't shutdown when removed from cluster, just become a follower.
// This allows the node to be re-added to the cluster without restarting the process.
if cfg.NoShutdownOnRemove {
rc.ShutdownOnRemove = false
}

baseDir := filepath.Join(cfg.StorageDir, cfg.ServerID)
if _, err := os.Stat(baseDir); os.IsNotExist(err) {
Expand Down
25 changes: 24 additions & 1 deletion op-conductor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,27 @@ var (
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RPC_HTTP_BODY_LIMIT_MB"),
Value: 5,
}

// X Layer: RoundRobinLeaderTransfer enables deterministic round-robin leader transfer.
// When enabled, leader transfer will cycle through all voters in sorted order (by ServerID),
// ensuring that even if only one node in the cluster is healthy, it will eventually become leader.
// This is useful when Raft's default log-based leader selection keeps choosing unhealthy nodes.
RoundRobinLeaderTransfer = &cli.BoolFlag{
Name: "raft.round-robin-leader-transfer",
Usage: "Enable deterministic round-robin leader transfer instead of Raft's default log-based selection",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_ROUND_ROBIN_LEADER_TRANSFER"),
Value: false,
}

// X Layer: RaftNoShutdownOnRemove prevents Raft from shutting down when removed from cluster.
// When enabled, the node will transition to follower state instead of shutting down,
// allowing it to be re-added to the cluster without restarting the process.
RaftNoShutdownOnRemove = &cli.BoolFlag{
Name: "raft.no-shutdown-on-remove",
Usage: "Don't shutdown Raft when removed from cluster, just become a follower (allows re-adding without restart)",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_NO_SHUTDOWN_ON_REMOVE"),
Value: false,
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -246,7 +267,9 @@ var optionalFlags = []cli.Flag{
HealthcheckExecutionP2pCheckApi,
HealthCheckRollupBoostPartialHealthinessToleranceLimit,
HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds,
HTTPBodyLimitMB, // X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server.
HTTPBodyLimitMB, // X Layer: HTTPBodyLimitMB is the HTTP request body size limit in MB for RPC server.
RoundRobinLeaderTransfer, // X Layer: Enable round-robin leader transfer
RaftNoShutdownOnRemove, // X Layer: Don't shutdown on remove
}

func init() {
Expand Down