From b5e8a8fb8fce3c21fa402822e23f63ab1421ebe6 Mon Sep 17 00:00:00 2001 From: Massimiliano Date: Mon, 15 Dec 2025 16:45:20 +0100 Subject: [PATCH 1/3] CNDB-14868 Add system property to always consider the remote peer supporting repair message timeouts. Right now the peer's version is checked to figure out if timeouts are supported, but that doesn't work in CNDB, as repair services are not added to the Nodes.peers() table, so there's no clear way to check what version a remote peer is running on. The newly introduced system property allows to skip this version check and always consider timeouts supported by the remote peer. --- .../cassandra/repair/messages/RepairMessage.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 83000628683e..c7fd467e0f19 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -43,6 +43,12 @@ */ public abstract class RepairMessage { + /** + * If true, we will always consider remote nodes to support repair message timeouts, + * and fail the repair if a response is not received on time. + * Default: false, to preserve backward compatibility. + */ + private static final boolean ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED = Boolean.parseBoolean(System.getProperty("cassandra.repair.always_consider_timeouts_supported", "false")); private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); public final RepairJobDesc desc; @@ -90,6 +96,14 @@ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReaso private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) { + /* + * In CNDB, repair services won't be added to the Nodes.peers() map, so there's no clear way + * to check the version of the remove peer. This is the reason why a system property is introduced + * to skip the version check, in case it's known that the deployed C* version supports repair message + * timeouts. + */ + if (ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED) + return true; CassandraVersion remoteVersion = Nodes.peers().map(from, NodeInfo::getReleaseVersion, () -> null); if (remoteVersion != null && remoteVersion.compareTo(SUPPORTS_TIMEOUTS, true) >= 0) return true; From fe142c0db0fd333d2caff55c30ac85c1d7086436 Mon Sep 17 00:00:00 2001 From: Massimiliano Date: Thu, 18 Dec 2025 11:22:10 +0100 Subject: [PATCH 2/3] CNDB-14868 Add logging to correlate with repair parent session id more easily --- src/java/org/apache/cassandra/repair/RepairJobDesc.java | 6 ++++-- .../apache/cassandra/repair/RepairMessageVerbHandler.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 4aaf655b8258..3603c14fe8de 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -65,12 +65,14 @@ public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, Stri @Override public String toString() { - return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; + String parentSessionId = this.parentSessionId == null ? "" : " (parent session id: #" + this.parentSessionId + ")"; + return "[repair #" + sessionId + parentSessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; } public String toString(PreviewKind previewKind) { - return '[' + previewKind.logPrefix() + " #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; + String parentSessionId = this.parentSessionId == null ? "" : " (parent session id: #" + this.parentSessionId + ")"; + return '[' + previewKind.logPrefix() + " #" + sessionId + parentSessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; } @Override diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 8a16089c3cf7..1064cf818e85 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -177,8 +177,8 @@ else if (message.verb() == SYNC_REQ) } else if (message.verb() == CLEANUP_MSG) { - logger.debug("cleaning up repair"); CleanupMessage cleanup = (CleanupMessage) message.payload; + logger.debug("Cleaning up parent repair session {}", cleanup.parentRepairSession); ActiveRepairService.instance.removeParentRepairSession(cleanup.parentRepairSession); MessagingService.instance().send(message.emptyResponse(), message.from()); } From 5a962651575ab6d8aee9623a611678f7958066e2 Mon Sep 17 00:00:00 2001 From: Massimiliano Date: Mon, 19 Jan 2026 17:48:35 +0100 Subject: [PATCH 3/3] CNDB-14868 Added RepairMessageTest to cover the path where the support for repair timeouts is checked. Moved away from reading the system property in the static initialization to make it more testable --- .../repair/messages/RepairMessage.java | 22 +++- .../repair/messages/RepairMessageTest.java | 103 ++++++++++++++++++ 2 files changed, 121 insertions(+), 4 deletions(-) create mode 100644 test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index c7fd467e0f19..53c72123b229 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -19,6 +19,7 @@ import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +49,15 @@ public abstract class RepairMessage * and fail the repair if a response is not received on time. * Default: false, to preserve backward compatibility. */ - private static final boolean ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED = Boolean.parseBoolean(System.getProperty("cassandra.repair.always_consider_timeouts_supported", "false")); - private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); + @VisibleForTesting + protected static final String ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY = "cassandra.repair.always_consider_timeouts_supported"; + private static final boolean ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_DEFAULT = false; + + /** + * The first C* version to support repair message timeouts. + */ + @VisibleForTesting + protected static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); public final RepairJobDesc desc; @@ -94,7 +102,8 @@ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReaso callback); } - private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) + @VisibleForTesting + protected static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) { /* * In CNDB, repair services won't be added to the Nodes.peers() map, so there's no clear way @@ -102,7 +111,7 @@ private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSess * to skip the version check, in case it's known that the deployed C* version supports repair message * timeouts. */ - if (ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED) + if (areTimeoutsAlwaysSupported()) return true; CassandraVersion remoteVersion = Nodes.peers().map(from, NodeInfo::getReleaseVersion, () -> null); if (remoteVersion != null && remoteVersion.compareTo(SUPPORTS_TIMEOUTS, true) >= 0) @@ -110,4 +119,9 @@ private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSess logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version = {})", parentSessionId, from, remoteVersion); return false; } + + private static boolean areTimeoutsAlwaysSupported() + { + return Boolean.parseBoolean(System.getProperty(ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY, Boolean.toString(ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_DEFAULT))); + } } diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java new file mode 100644 index 000000000000..5d822b3c9db6 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.File; +import java.util.UUID; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.nodes.Nodes; +import org.apache.cassandra.nodes.PeerInfo; +import org.apache.cassandra.utils.CassandraVersion; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RepairMessageTest +{ + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private File dir; + + @Before + public void initialize() throws Throwable + { + dir = folder.newFolder(); + Nodes.Instance.unsafeSetup(dir.toPath()); + } + + @After + public void tearDown() + { + // Always clean the system property before running a new test + System.clearProperty(RepairMessage.ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY); + } + + @Test + public void testSupportsTimeoutsIsTrueWithCassandraVersionSupportingTimeouts() throws Exception + { + InetAddressAndPort addressAndPort = InetAddressAndPort.getByName("127.0.0.3"); + Nodes.peers().update(addressAndPort, p -> fakePeer(p, RepairMessage.SUPPORTS_TIMEOUTS)); + assertThat(RepairMessage.supportsTimeouts(addressAndPort, UUID.randomUUID())).isTrue(); + } + + @Test + public void testSupportsTimeoutsIsFalseWithCassandraVersionNotSupportingTimeouts() throws Exception + { + InetAddressAndPort addressAndPort = InetAddressAndPort.getByName("127.0.0.3"); + CassandraVersion versionNotSupportingTimeouts = CassandraVersion.CASSANDRA_4_0; + Nodes.peers().update(addressAndPort, p -> fakePeer(p, versionNotSupportingTimeouts)); + + assertThat(versionNotSupportingTimeouts.compareTo(RepairMessage.SUPPORTS_TIMEOUTS)).isLessThan(0); + assertThat(RepairMessage.supportsTimeouts(addressAndPort, UUID.randomUUID())).isFalse(); + } + + @Test + public void testSupportsTimeoutsIsAlwaysTrueWhenSystemPropertyIsSetToTrue() throws Exception + { + System.setProperty(RepairMessage.ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY, "true"); + + // Check with peer not supporting timeouts + InetAddressAndPort peerNotSupportingTimeouts = InetAddressAndPort.getByName("127.0.0.3"); + CassandraVersion versionNotSupportingTimeouts = CassandraVersion.CASSANDRA_4_0; + Nodes.peers().update(peerNotSupportingTimeouts, p -> fakePeer(p, versionNotSupportingTimeouts)); + assertThat(versionNotSupportingTimeouts.compareTo(RepairMessage.SUPPORTS_TIMEOUTS)).isLessThan(0); + assertThat(RepairMessage.supportsTimeouts(peerNotSupportingTimeouts, UUID.randomUUID())).isTrue(); + + // Check with peer supporting timeouts + InetAddressAndPort peerSupportingTimeouts = InetAddressAndPort.getByName("127.0.0.4"); + Nodes.peers().update(peerSupportingTimeouts, p -> fakePeer(p, RepairMessage.SUPPORTS_TIMEOUTS)); + assertThat(RepairMessage.supportsTimeouts(peerSupportingTimeouts, UUID.randomUUID())).isTrue(); + } + + static void fakePeer(PeerInfo p, CassandraVersion version) + { + int nodeId = p.getPeer().address.getAddress()[3]; + p.setPreferred(p.getPeer()); + p.setDataCenter("DC" + nodeId); + p.setRack("RAC" + nodeId); + p.setHostId(UUID.randomUUID()); + p.setReleaseVersion(version); + } +} \ No newline at end of file