diff --git a/src/main/java/com/amazonaws/services/neptune/cluster/AddCloneTask.java b/src/main/java/com/amazonaws/services/neptune/cluster/AddCloneTask.java index 13253326..e0c447fe 100644 --- a/src/main/java/com/amazonaws/services/neptune/cluster/AddCloneTask.java +++ b/src/main/java/com/amazonaws/services/neptune/cluster/AddCloneTask.java @@ -13,19 +13,19 @@ package com.amazonaws.services.neptune.cluster; -import com.amazonaws.services.neptune.io.KinesisConfig; import com.amazonaws.services.neptune.util.Activity; import com.amazonaws.services.neptune.util.Timer; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.services.neptune.NeptuneClient; import software.amazon.awssdk.services.neptune.model.*; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -126,22 +126,35 @@ private void createReplicas(NeptuneClusterMetadata sourceClusterMetadata, DBParameterGroup dbParameterGroup, DBCluster targetDbCluster) { + logger.info("Creating {} replica instances with instance type {}", replicaCount, instanceType); ExecutorService taskExecutor = Executors.newFixedThreadPool(replicaCount); for (int i = 0; i < replicaCount; i++) { - - taskExecutor.execute(() -> createInstance("replica", - neptune, - sourceClusterMetadata, - instanceType, - dbParameterGroup, - targetDbCluster)); + final int replicaNumber = i + 1; + logger.debug("Scheduling creation of replica {}/{}", replicaNumber, replicaCount); + + taskExecutor.execute(() -> { + logger.debug("Starting creation of replica instance {}/{}", replicaNumber, replicaCount); + createInstance("replica", + neptune, + sourceClusterMetadata, + instanceType, + dbParameterGroup, + targetDbCluster); + logger.debug("Completed creation of replica instance {}/{}", replicaNumber, replicaCount); + }); } taskExecutor.shutdown(); try { - taskExecutor.awaitTermination(30, TimeUnit.MINUTES); + final int timeoutMin = 30; + boolean completed = taskExecutor.awaitTermination(timeoutMin, TimeUnit.MINUTES); + if (completed) { + logger.debug("Successfully created all {} replica instances", replicaCount); + } else { + logger.warn("Timed out waiting for all replicas to be created after {} minutes", timeoutMin); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -168,10 +181,23 @@ private DBCluster createCluster(NeptuneClusterMetadata sourceClusterMetadata, .tags(getTags(sourceClusterMetadata.clusterId())); if (this.enableAuditLogs) { + logger.debug("Enabling audit logs for cloned cluster"); restoreDbClusterToPointInTimeRequestBuilder = restoreDbClusterToPointInTimeRequestBuilder.enableCloudwatchLogsExports("audit"); } - DBCluster targetDbCluster = neptune.restoreDBClusterToPointInTime(restoreDbClusterToPointInTimeRequestBuilder.build()).dbCluster(); + DBCluster targetDbCluster; + RestoreDbClusterToPointInTimeRequest request = restoreDbClusterToPointInTimeRequestBuilder.build(); + + try { + logger.debug("Sending restore DB cluster request: {}", request); + targetDbCluster = neptune.restoreDBClusterToPointInTime(request).dbCluster(); + } catch (NeptuneException e) { + logger.error("Failed to create target cluster: {} from source cluster: {} (Error code: {}, Message: {})", + targetClusterId, sourceClusterId, Optional.ofNullable(e.awsErrorDetails()).map(AwsErrorDetails::errorCode).orElse("N/A"), + e.getMessage(), e); + + throw e; + } String clusterStatus = targetDbCluster.status(); @@ -190,6 +216,13 @@ private DBCluster createCluster(NeptuneClusterMetadata sourceClusterMetadata, .status(); } + logger.debug("Cluster {} is now in {} state", targetClusterId, clusterStatus); + + // Check if the final status indicates success + if (!clusterStatus.equals("available")) { + logger.warn("Cluster {} is in {} state instead of 'available'", targetClusterId, clusterStatus); + } + return targetDbCluster; } @@ -266,16 +299,25 @@ private DBParameterGroup createDbParameterGroup(NeptuneClusterMetadata sourceClu private DBClusterParameterGroup createDbClusterParameterGroup(NeptuneClusterMetadata sourceClusterMetadata, NeptuneClient neptune) { + String paramGroupName = String.format("%s-db-cluster-params", targetClusterId); DBClusterParameterGroup dbClusterParameterGroup; - - dbClusterParameterGroup = neptune.createDBClusterParameterGroup( - CreateDbClusterParameterGroupRequest.builder() - .dbClusterParameterGroupName(String.format("%s-db-cluster-params", targetClusterId)) - .description(String.format("%s DB Cluster Parameter Group", targetClusterId)) - .dbParameterGroupFamily(sourceClusterMetadata.dbParameterGroupFamily()) - .tags(getTags(sourceClusterMetadata.clusterId())) - .build() - ).dbClusterParameterGroup(); + + try { + CreateDbClusterParameterGroupRequest request = CreateDbClusterParameterGroupRequest.builder() + .dbClusterParameterGroupName(paramGroupName) + .description(String.format("%s DB Cluster Parameter Group", targetClusterId)) + .dbParameterGroupFamily(sourceClusterMetadata.dbParameterGroupFamily()) + .tags(getTags(sourceClusterMetadata.clusterId())) + .build(); + + dbClusterParameterGroup = neptune.createDBClusterParameterGroup(request).dbClusterParameterGroup(); + logger.debug("Successfully created DB cluster parameter group: {}", dbClusterParameterGroup.dbClusterParameterGroupName()); + } catch (NeptuneException e) { + logger.error("Failed to create DB cluster parameter group: {} (Error code: {}, Message: {})", + paramGroupName, Optional.ofNullable(e.awsErrorDetails()).map(AwsErrorDetails::errorCode).orElse("N/A"), + e.getMessage(), e); + throw e; + } String neptuneStreamsParameterValue = sourceClusterMetadata.isStreamEnabled() ? "1" : "0"; @@ -300,6 +342,7 @@ private DBClusterParameterGroup createDbClusterParameterGroup(NeptuneClusterMeta .build()); if (this.enableAuditLogs) { + logger.debug("Adding neptune_enable_audit_log parameter"); requestBuilder = requestBuilder.parameters(Parameter.builder() .parameterName("neptune_enable_audit_log") .parameterValue("1") @@ -367,16 +410,16 @@ private void createInstance(String name, DBParameterGroup dbParameterGroup, DBCluster targetDbCluster) { + String instanceId = String.format("neptune-export-%s-%s", name, UUID.randomUUID().toString().substring(0, 5)); System.err.println("Creating target " + name + " instance..."); CreateDbInstanceRequest.Builder requestBuilder = CreateDbInstanceRequest.builder() .dbInstanceClass(instanceType.value()) - .dbInstanceIdentifier(String.format("neptune-export-%s-%s", name, UUID.randomUUID().toString().substring(0, 5))) + .dbInstanceIdentifier(instanceId) .dbClusterIdentifier(targetDbCluster.dbClusterIdentifier()) .dbParameterGroupName(dbParameterGroup.dbParameterGroupName()) .engine("neptune") - .tags(getTags(sourceClusterMetadata.clusterId())) - ; + .tags(getTags(sourceClusterMetadata.clusterId())); if (StringUtils.isNotEmpty(engineVersion)) { requestBuilder = requestBuilder.engineVersion(engineVersion); @@ -391,6 +434,7 @@ private void createInstance(String name, // Retry loop with exponential backoff for (int attempt = 0; attempt <= maxRetries; attempt++) { try { + logger.debug("Sending create DB instance request for {} in cluster {}", instanceId, targetDbCluster.dbClusterIdentifier()); targetDbInstance = neptune.createDBInstance(request).dbInstance(); // If we get here, the request was successful break; @@ -400,8 +444,6 @@ private void createInstance(String name, logger.error("Failed to create {} instance after {} attempts, with error {}", name, maxRetries, e.getMessage()); return; } - - // Calculate backoff time with exponential increase and some jitter long backoffMillis = initialBackoffMillis * (long) Math.pow(2, attempt); @@ -432,11 +474,38 @@ private void createInstance(String name, } catch (InterruptedException e) { e.printStackTrace(); } - instanceStatus = neptune.describeDBInstances(DescribeDbInstancesRequest.builder() - .dbInstanceIdentifier(targetDbInstance.dbInstanceIdentifier()).build()) - .dbInstances() - .get(0) - .dbInstanceStatus(); + + try { + DescribeDbInstancesRequest describeRequest = DescribeDbInstancesRequest.builder() + .dbInstanceIdentifier(targetDbInstance.dbInstanceIdentifier()) + .build(); + + instanceStatus = neptune.describeDBInstances(describeRequest) + .dbInstances() + .get(0) + .dbInstanceStatus(); + + logger.debug("Instance {} status: {}", targetDbInstance.dbInstanceIdentifier(), instanceStatus); + } catch (NeptuneException e) { + logger.error("Error checking instance status: {} (Error code: {}, Message: {})", + targetDbInstance.dbInstanceIdentifier(), Optional.ofNullable(e.awsErrorDetails()).map(AwsErrorDetails::errorCode).orElse("N/A"), + e.getMessage(), e); + + if (e.awsErrorDetails() != null && e.awsErrorDetails().errorCode() != null && e.awsErrorDetails().errorCode().equals("DBInstanceNotFound")) { + logger.error("The instance {} was not found. It may have been deleted or failed to create properly.", + targetDbInstance.dbInstanceIdentifier()); + } + throw e; + } + } + + logger.debug("{} instance {} is now in {} state", + name, targetDbInstance.dbInstanceIdentifier(), instanceStatus); + + // Check if the final status indicates success + if (!instanceStatus.equals("available")) { + logger.warn("{} instance {} is in {} state instead of 'available'", + name, targetDbInstance.dbInstanceIdentifier(), instanceStatus); } }