diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java index b2d4b970..f27381ec 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java @@ -264,7 +264,11 @@ private void fillConnectorConfig(KeyValue config) { connectorConfig.setDestEndpoint(config.getString(connectorConfig.DEST_ENDPOINT)); connectorConfig.setDestTopic(config.getString(connectorConfig.DEST_TOPIC)); connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true"))); + connectorConfig.setDestAccessKey(config.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY)); + connectorConfig.setDestSecretKey(config.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY)); connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true"))); + connectorConfig.setSrcAccessKey(config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY)); + connectorConfig.setSrcSecretKey(config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY)); connectorConfig.setCheckpointIntervalMs(config.getInt(connectorConfig.CHECKPOINT_INTERVAL_MS, connectorConfig.getCheckpointIntervalMs())); connectorConfig.setSyncGids(config.getString(connectorConfig.SYNC_GIDS)); connectorConfig.setCheckpointTopic(config.getString(connectorConfig.CHECKPOINT_TOPIC, connectorConfig.DEFAULT_CHECKPOINT_TOPIC));