diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 4aaf655b8258..3603c14fe8de 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -65,12 +65,14 @@ public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, Stri @Override public String toString() { - return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; + String parentSessionId = this.parentSessionId == null ? "" : " (parent session id: #" + this.parentSessionId + ")"; + return "[repair #" + sessionId + parentSessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; } public String toString(PreviewKind previewKind) { - return '[' + previewKind.logPrefix() + " #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; + String parentSessionId = this.parentSessionId == null ? "" : " (parent session id: #" + this.parentSessionId + ")"; + return '[' + previewKind.logPrefix() + " #" + sessionId + parentSessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; } @Override diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 8a16089c3cf7..1064cf818e85 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -177,8 +177,8 @@ else if (message.verb() == SYNC_REQ) } else if (message.verb() == CLEANUP_MSG) { - logger.debug("cleaning up repair"); CleanupMessage cleanup = (CleanupMessage) message.payload; + logger.debug("Cleaning up parent repair session {}", cleanup.parentRepairSession); ActiveRepairService.instance.removeParentRepairSession(cleanup.parentRepairSession); MessagingService.instance().send(message.emptyResponse(), message.from()); } diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 83000628683e..53c72123b229 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -19,6 +19,7 @@ import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,20 @@ */ public abstract class RepairMessage { - private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); + /** + * If true, we will always consider remote nodes to support repair message timeouts, + * and fail the repair if a response is not received on time. + * Default: false, to preserve backward compatibility. + */ + @VisibleForTesting + protected static final String ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY = "cassandra.repair.always_consider_timeouts_supported"; + private static final boolean ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_DEFAULT = false; + + /** + * The first C* version to support repair message timeouts. + */ + @VisibleForTesting + protected static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); public final RepairJobDesc desc; @@ -88,12 +102,26 @@ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReaso callback); } - private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) + @VisibleForTesting + protected static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) { + /* + * In CNDB, repair services won't be added to the Nodes.peers() map, so there's no clear way + * to check the version of the remove peer. This is the reason why a system property is introduced + * to skip the version check, in case it's known that the deployed C* version supports repair message + * timeouts. + */ + if (areTimeoutsAlwaysSupported()) + return true; CassandraVersion remoteVersion = Nodes.peers().map(from, NodeInfo::getReleaseVersion, () -> null); if (remoteVersion != null && remoteVersion.compareTo(SUPPORTS_TIMEOUTS, true) >= 0) return true; logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version = {})", parentSessionId, from, remoteVersion); return false; } + + private static boolean areTimeoutsAlwaysSupported() + { + return Boolean.parseBoolean(System.getProperty(ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY, Boolean.toString(ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_DEFAULT))); + } } diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java new file mode 100644 index 000000000000..5d822b3c9db6 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.File; +import java.util.UUID; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.nodes.Nodes; +import org.apache.cassandra.nodes.PeerInfo; +import org.apache.cassandra.utils.CassandraVersion; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RepairMessageTest +{ + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private File dir; + + @Before + public void initialize() throws Throwable + { + dir = folder.newFolder(); + Nodes.Instance.unsafeSetup(dir.toPath()); + } + + @After + public void tearDown() + { + // Always clean the system property before running a new test + System.clearProperty(RepairMessage.ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY); + } + + @Test + public void testSupportsTimeoutsIsTrueWithCassandraVersionSupportingTimeouts() throws Exception + { + InetAddressAndPort addressAndPort = InetAddressAndPort.getByName("127.0.0.3"); + Nodes.peers().update(addressAndPort, p -> fakePeer(p, RepairMessage.SUPPORTS_TIMEOUTS)); + assertThat(RepairMessage.supportsTimeouts(addressAndPort, UUID.randomUUID())).isTrue(); + } + + @Test + public void testSupportsTimeoutsIsFalseWithCassandraVersionNotSupportingTimeouts() throws Exception + { + InetAddressAndPort addressAndPort = InetAddressAndPort.getByName("127.0.0.3"); + CassandraVersion versionNotSupportingTimeouts = CassandraVersion.CASSANDRA_4_0; + Nodes.peers().update(addressAndPort, p -> fakePeer(p, versionNotSupportingTimeouts)); + + assertThat(versionNotSupportingTimeouts.compareTo(RepairMessage.SUPPORTS_TIMEOUTS)).isLessThan(0); + assertThat(RepairMessage.supportsTimeouts(addressAndPort, UUID.randomUUID())).isFalse(); + } + + @Test + public void testSupportsTimeoutsIsAlwaysTrueWhenSystemPropertyIsSetToTrue() throws Exception + { + System.setProperty(RepairMessage.ALWAYS_CONSIDER_TIMEOUTS_SUPPORTED_PROPERTY, "true"); + + // Check with peer not supporting timeouts + InetAddressAndPort peerNotSupportingTimeouts = InetAddressAndPort.getByName("127.0.0.3"); + CassandraVersion versionNotSupportingTimeouts = CassandraVersion.CASSANDRA_4_0; + Nodes.peers().update(peerNotSupportingTimeouts, p -> fakePeer(p, versionNotSupportingTimeouts)); + assertThat(versionNotSupportingTimeouts.compareTo(RepairMessage.SUPPORTS_TIMEOUTS)).isLessThan(0); + assertThat(RepairMessage.supportsTimeouts(peerNotSupportingTimeouts, UUID.randomUUID())).isTrue(); + + // Check with peer supporting timeouts + InetAddressAndPort peerSupportingTimeouts = InetAddressAndPort.getByName("127.0.0.4"); + Nodes.peers().update(peerSupportingTimeouts, p -> fakePeer(p, RepairMessage.SUPPORTS_TIMEOUTS)); + assertThat(RepairMessage.supportsTimeouts(peerSupportingTimeouts, UUID.randomUUID())).isTrue(); + } + + static void fakePeer(PeerInfo p, CassandraVersion version) + { + int nodeId = p.getPeer().address.getAddress()[3]; + p.setPreferred(p.getPeer()); + p.setDataCenter("DC" + nodeId); + p.setRack("RAC" + nodeId); + p.setHostId(UUID.randomUUID()); + p.setReleaseVersion(version); + } +} \ No newline at end of file