Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 99 additions & 30 deletions src/main/java/com/amazonaws/services/neptune/cluster/AddCloneTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
package com.amazonaws.services.neptune.cluster;


import com.amazonaws.services.neptune.io.KinesisConfig;
import com.amazonaws.services.neptune.util.Activity;
import com.amazonaws.services.neptune.util.Timer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.services.neptune.NeptuneClient;
import software.amazon.awssdk.services.neptune.model.*;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -126,22 +126,35 @@ private void createReplicas(NeptuneClusterMetadata sourceClusterMetadata,
DBParameterGroup dbParameterGroup,
DBCluster targetDbCluster) {

logger.info("Creating {} replica instances with instance type {}", replicaCount, instanceType);
ExecutorService taskExecutor = Executors.newFixedThreadPool(replicaCount);

for (int i = 0; i < replicaCount; i++) {

taskExecutor.execute(() -> createInstance("replica",
neptune,
sourceClusterMetadata,
instanceType,
dbParameterGroup,
targetDbCluster));
final int replicaNumber = i + 1;
logger.debug("Scheduling creation of replica {}/{}", replicaNumber, replicaCount);

taskExecutor.execute(() -> {
logger.debug("Starting creation of replica instance {}/{}", replicaNumber, replicaCount);
createInstance("replica",
neptune,
sourceClusterMetadata,
instanceType,
dbParameterGroup,
targetDbCluster);
logger.debug("Completed creation of replica instance {}/{}", replicaNumber, replicaCount);
});
}

taskExecutor.shutdown();

try {
taskExecutor.awaitTermination(30, TimeUnit.MINUTES);
final int timeoutMin = 30;
boolean completed = taskExecutor.awaitTermination(timeoutMin, TimeUnit.MINUTES);
if (completed) {
logger.debug("Successfully created all {} replica instances", replicaCount);
} else {
logger.warn("Timed out waiting for all replicas to be created after {} minutes", timeoutMin);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand All @@ -168,10 +181,23 @@ private DBCluster createCluster(NeptuneClusterMetadata sourceClusterMetadata,
.tags(getTags(sourceClusterMetadata.clusterId()));

if (this.enableAuditLogs) {
logger.debug("Enabling audit logs for cloned cluster");
restoreDbClusterToPointInTimeRequestBuilder = restoreDbClusterToPointInTimeRequestBuilder.enableCloudwatchLogsExports("audit");
}

DBCluster targetDbCluster = neptune.restoreDBClusterToPointInTime(restoreDbClusterToPointInTimeRequestBuilder.build()).dbCluster();
DBCluster targetDbCluster;
RestoreDbClusterToPointInTimeRequest request = restoreDbClusterToPointInTimeRequestBuilder.build();

try {
logger.debug("Sending restore DB cluster request: {}", request);
targetDbCluster = neptune.restoreDBClusterToPointInTime(request).dbCluster();
} catch (NeptuneException e) {
logger.error("Failed to create target cluster: {} from source cluster: {} (Error code: {}, Message: {})",
targetClusterId, sourceClusterId, Optional.ofNullable(e.awsErrorDetails()).map(AwsErrorDetails::errorCode).orElse("N/A"),
e.getMessage(), e);

throw e;
}

String clusterStatus = targetDbCluster.status();

Expand All @@ -190,6 +216,13 @@ private DBCluster createCluster(NeptuneClusterMetadata sourceClusterMetadata,
.status();
}

logger.debug("Cluster {} is now in {} state", targetClusterId, clusterStatus);

// Check if the final status indicates success
if (!clusterStatus.equals("available")) {
logger.warn("Cluster {} is in {} state instead of 'available'", targetClusterId, clusterStatus);
}

return targetDbCluster;
}

Expand Down Expand Up @@ -266,16 +299,25 @@ private DBParameterGroup createDbParameterGroup(NeptuneClusterMetadata sourceClu

private DBClusterParameterGroup createDbClusterParameterGroup(NeptuneClusterMetadata sourceClusterMetadata,
NeptuneClient neptune) {
String paramGroupName = String.format("%s-db-cluster-params", targetClusterId);
DBClusterParameterGroup dbClusterParameterGroup;

dbClusterParameterGroup = neptune.createDBClusterParameterGroup(
CreateDbClusterParameterGroupRequest.builder()
.dbClusterParameterGroupName(String.format("%s-db-cluster-params", targetClusterId))
.description(String.format("%s DB Cluster Parameter Group", targetClusterId))
.dbParameterGroupFamily(sourceClusterMetadata.dbParameterGroupFamily())
.tags(getTags(sourceClusterMetadata.clusterId()))
.build()
).dbClusterParameterGroup();

try {
CreateDbClusterParameterGroupRequest request = CreateDbClusterParameterGroupRequest.builder()
.dbClusterParameterGroupName(paramGroupName)
.description(String.format("%s DB Cluster Parameter Group", targetClusterId))
.dbParameterGroupFamily(sourceClusterMetadata.dbParameterGroupFamily())
.tags(getTags(sourceClusterMetadata.clusterId()))
.build();

dbClusterParameterGroup = neptune.createDBClusterParameterGroup(request).dbClusterParameterGroup();
logger.debug("Successfully created DB cluster parameter group: {}", dbClusterParameterGroup.dbClusterParameterGroupName());
} catch (NeptuneException e) {
logger.error("Failed to create DB cluster parameter group: {} (Error code: {}, Message: {})",
paramGroupName, Optional.ofNullable(e.awsErrorDetails()).map(AwsErrorDetails::errorCode).orElse("N/A"),
e.getMessage(), e);
throw e;
}

String neptuneStreamsParameterValue = sourceClusterMetadata.isStreamEnabled() ? "1" : "0";

Expand All @@ -300,6 +342,7 @@ private DBClusterParameterGroup createDbClusterParameterGroup(NeptuneClusterMeta
.build());

if (this.enableAuditLogs) {
logger.debug("Adding neptune_enable_audit_log parameter");
requestBuilder = requestBuilder.parameters(Parameter.builder()
.parameterName("neptune_enable_audit_log")
.parameterValue("1")
Expand Down Expand Up @@ -367,16 +410,16 @@ private void createInstance(String name,
DBParameterGroup dbParameterGroup,
DBCluster targetDbCluster) {

String instanceId = String.format("neptune-export-%s-%s", name, UUID.randomUUID().toString().substring(0, 5));
System.err.println("Creating target " + name + " instance...");

CreateDbInstanceRequest.Builder requestBuilder = CreateDbInstanceRequest.builder()
.dbInstanceClass(instanceType.value())
.dbInstanceIdentifier(String.format("neptune-export-%s-%s", name, UUID.randomUUID().toString().substring(0, 5)))
.dbInstanceIdentifier(instanceId)
.dbClusterIdentifier(targetDbCluster.dbClusterIdentifier())
.dbParameterGroupName(dbParameterGroup.dbParameterGroupName())
.engine("neptune")
.tags(getTags(sourceClusterMetadata.clusterId()))
;
.tags(getTags(sourceClusterMetadata.clusterId()));

if (StringUtils.isNotEmpty(engineVersion)) {
requestBuilder = requestBuilder.engineVersion(engineVersion);
Expand All @@ -391,6 +434,7 @@ private void createInstance(String name,
// Retry loop with exponential backoff
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
logger.debug("Sending create DB instance request for {} in cluster {}", instanceId, targetDbCluster.dbClusterIdentifier());
targetDbInstance = neptune.createDBInstance(request).dbInstance();
// If we get here, the request was successful
break;
Expand All @@ -400,8 +444,6 @@ private void createInstance(String name,
logger.error("Failed to create {} instance after {} attempts, with error {}", name, maxRetries, e.getMessage());
return;
}



// Calculate backoff time with exponential increase and some jitter
long backoffMillis = initialBackoffMillis * (long) Math.pow(2, attempt);
Expand Down Expand Up @@ -432,11 +474,38 @@ private void createInstance(String name,
} catch (InterruptedException e) {
e.printStackTrace();
}
instanceStatus = neptune.describeDBInstances(DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(targetDbInstance.dbInstanceIdentifier()).build())
.dbInstances()
.get(0)
.dbInstanceStatus();

try {
DescribeDbInstancesRequest describeRequest = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(targetDbInstance.dbInstanceIdentifier())
.build();

instanceStatus = neptune.describeDBInstances(describeRequest)
.dbInstances()
.get(0)
.dbInstanceStatus();

logger.debug("Instance {} status: {}", targetDbInstance.dbInstanceIdentifier(), instanceStatus);
} catch (NeptuneException e) {
logger.error("Error checking instance status: {} (Error code: {}, Message: {})",
targetDbInstance.dbInstanceIdentifier(), Optional.ofNullable(e.awsErrorDetails()).map(AwsErrorDetails::errorCode).orElse("N/A"),
e.getMessage(), e);

if (e.awsErrorDetails() != null && e.awsErrorDetails().errorCode() != null && e.awsErrorDetails().errorCode().equals("DBInstanceNotFound")) {
logger.error("The instance {} was not found. It may have been deleted or failed to create properly.",
targetDbInstance.dbInstanceIdentifier());
}
throw e;
}
}

logger.debug("{} instance {} is now in {} state",
name, targetDbInstance.dbInstanceIdentifier(), instanceStatus);

// Check if the final status indicates success
if (!instanceStatus.equals("available")) {
logger.warn("{} instance {} is in {} state instead of 'available'",
name, targetDbInstance.dbInstanceIdentifier(), instanceStatus);
}
}

Expand Down
Loading