diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
index 08cdec15bec6..4c06111e924c 100644
--- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
@@ -3,6 +3,7 @@
### 4.43.0-beta.1 (Unreleased)
#### Features Added
+* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
#### Breaking Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
index 7400620d0e2f..9a06400585d0 100644
--- a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
@@ -3,6 +3,7 @@
### 4.43.0-beta.1 (Unreleased)
#### Features Added
+* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
#### Breaking Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md
index 5d9907342ba7..d13b5bada3b4 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md
+++ b/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md
@@ -56,22 +56,24 @@
| `spark.cosmos.disableTcpConnectionEndpointRediscovery` | `false` | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery. |
| `spark.cosmos.read.allowInvalidJsonWithDuplicateJsonProperties` | `false` | By default (when set to false) the Cosmos Java SDK and spark connector will raise a hard failure when json documents are read that contain json object with multiple properties of the same name. This config option can be used to override the behavior and silently ignore the invalid json and instead use the last occurrence of the property when parsing the json. NOTE: This is only meant to be used as a temporary workaround. We strongly recommend fixing the invalid json from even being ingested into the data and only use this workaround while cleaning up the documents with invalid json. |
| `spark.cosmos.proactiveConnectionInitialization` | None | Can be used to define a list (semicolon separated) of `DB/Container` pairs. Connections for these containers will be proactively warmed-up when using direct mode. The format of the config would be `DB1/Collection1;DB2/Collection2` etc. |
-| `spark.cosmos.proactiveConnectionInitializationDurationInSeconds` | `120` | The maximum duration for which the client when being initialized would aggressively try to warm-up collections. After this time perios the warm-up will happen only slowly (on one background thread). |
+| `spark.cosmos.proactiveConnectionInitializationDurationInSeconds` | `120` | The maximum duration for which the client when being initialized would aggressively try to warm-up collections. After this time perios the warm-up will happen only slowly (on one background thread). |
| `spark.cosmos.metadata.feedRange.refreshIntervalInSeconds` | `120` | The time interval in seconds to refresh the internal partition key range cache, valid between `[60, 1800]`. By default it is 120 seconds. |
### Write Config
-| Config Property Name | Default | Description |
-|:----------------------------------------------------------------|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy:
- `ItemOverwrite` (using upsert),
- `ItemOverwriteIfNotModified` (if etag property of the row is empty/null it will just do an insert and ignore if the document already exists - same as `ItemAppend`, if an etag value exists it will attempt to replace the document with etag pre-condition. If the document changed - identified by precondition failure - the update is skipped and the document is not updated with the content of the data frame row),
- `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts),
- `ItemDelete` (delete all documents),
- `ItemDeleteIfNotModified` (delete all documents for which the etag hasn't changed),
- `ItemPatch` and `ItemPatchIfExists` (Partial update all documents based on the patch config, `ItemPatch` will fail the spark job when hitting 404/Not Found - while `ItemPatchIfExists` will skip documents that don't exist gracefully.),
- `ItemBulkUpdate` (read item, then patch the item locally, then using create if etag is empty, update/replace with etag pre-condition. In cases of any conflict or precondition failure, SDK will retry the above steps to update the documents properly.) |
-| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) |
-| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
-| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
-| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
-| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. **Note**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
-| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. |
-| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. |
-| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. |
-| `spark.cosmos.write.flush.noProgress.maxRetryIntervalInSeconds` | `2700` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes after the initial attempt (and restarting the bulk writer client-side). This time interval is supposed to be large enough to not fail Spark jobs even when there are transient write availability outages in the service. The default value of 45 minutes can be modified when you rather prefer Spark jobs to fail or extended when needed. |
+| Config Property Name | Default | Description |
+|:-----------------------------------------------------------------|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy:
- `ItemOverwrite` (using upsert),
- `ItemOverwriteIfNotModified` (if etag property of the row is empty/null it will just do an insert and ignore if the document already exists - same as `ItemAppend`, if an etag value exists it will attempt to replace the document with etag pre-condition. If the document changed - identified by precondition failure - the update is skipped and the document is not updated with the content of the data frame row),
- `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts),
- `ItemDelete` (delete all documents),
- `ItemDeleteIfNotModified` (delete all documents for which the etag hasn't changed),
- `ItemPatch` and `ItemPatchIfExists` (Partial update all documents based on the patch config, `ItemPatch` will fail the spark job when hitting 404/Not Found - while `ItemPatchIfExists` will skip documents that don't exist gracefully.),
- `ItemBulkUpdate` (read item, then patch the item locally, then using create if etag is empty, update/replace with etag pre-condition. In cases of any conflict or precondition failure, SDK will retry the above steps to update the documents properly.) |
+| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) |
+| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
+| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
+| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
+| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
+| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. |
+| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. |
+| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. |
+| `spark.cosmos.write.flush.noProgress.maxRetryIntervalInSeconds` | `2700` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes after the initial attempt (and restarting the bulk writer client-side). This time interval is supposed to be large enough to not fail Spark jobs even when there are transient write availability outages in the service. The default value of 45 minutes can be modified when you rather prefer Spark jobs to fail or extended when needed. |
+| `spark.cosmos.write.bulk.transactional.maxOperationsConcurrency` | `100` | Max number of in-flight operations per Cosmos partition for transactional bulk mode. Higher values increase parallelism (and RU usage) but can cause throttling; default ~100. | |
+| `spark.cosmos.write.bulk.transactional.maxBatchesConcurrency` | `5` | Max concurrent transactional batches per Cosmos partition (1..5). Controls batch-level parallelism; default 5. Each batch may contain multiple operations; tune together with 'spark.cosmos.write.bulk.transactional.maxOperationsConcurrency' to balance throughput and throttling. | |
#### Patch Config
| Config Property Name | Default | Description |
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala
index 1ae62d5c2b92..870b636e104f 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala
@@ -69,7 +69,9 @@ private class BulkWriter
// so multiplying by cpuCount in the default config is too aggressive
private val maxPendingOperations = writeConfig.bulkMaxPendingOperations
.getOrElse(DefaultMaxPendingOperationPerCore)
- private val maxConcurrentPartitions = writeConfig.maxConcurrentCosmosPartitions match {
+
+ private val bulkExecutionConfigs = writeConfig.bulkExecutionConfigs.get.asInstanceOf[CosmosWriteBulkExecutionConfigs]
+ private val maxConcurrentPartitions = bulkExecutionConfigs.maxConcurrentCosmosPartitions match {
// using the provided maximum of concurrent partitions per Spark partition on the input data
// multiplied by 2 to leave space for partition splits during ingestion
case Some(configuredMaxConcurrentPartitions) => 2 * configuredMaxConcurrentPartitions
@@ -146,20 +148,20 @@ private class BulkWriter
ThroughputControlHelper.populateThroughputControlGroupName(cosmosBulkExecutionOptions, writeConfig.throughputControlConfig)
- writeConfig.maxMicroBatchPayloadSizeInBytes match {
+ bulkExecutionConfigs.maxMicroBatchPayloadSizeInBytes match {
case Some(customMaxMicroBatchPayloadSizeInBytes) =>
cosmosBulkExecutionOptionsImpl
.setMaxMicroBatchPayloadSizeInBytes(customMaxMicroBatchPayloadSizeInBytes)
case None =>
}
- writeConfig.initialMicroBatchSize match {
+ bulkExecutionConfigs.initialMicroBatchSize match {
case Some(customInitialMicroBatchSize) =>
cosmosBulkExecutionOptions.setInitialMicroBatchSize(Math.max(1, customInitialMicroBatchSize))
case None =>
}
- writeConfig.maxMicroBatchSize match {
+ bulkExecutionConfigs.maxMicroBatchSize match {
case Some(customMaxMicroBatchSize) =>
cosmosBulkExecutionOptions.setMaxMicroBatchSize(
Math.max(
@@ -267,7 +269,7 @@ private class BulkWriter
// We start from using the bulk batch size and interval and concurrency
// If in the future, there is a need to separate the configuration, can re-consider
- val bulkBatchSize = writeConfig.maxMicroBatchSize match {
+ val bulkBatchSize = bulkExecutionConfigs.maxMicroBatchSize match {
case Some(customMaxMicroBatchSize) => Math.min(
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST,
Math.max(1, customMaxMicroBatchSize))
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
index f28e459b2d83..1692d947f74a 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
@@ -120,6 +120,8 @@ private[spark] object CosmosConfigNames {
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
+ val WriteBulkTransactionalMaxOperationsConcurrency = "spark.cosmos.write.bulk.transactional.maxOperationsConcurrency"
+ val WriteBulkTransactionalMaxBatchesConcurrency = "spark.cosmos.write.bulk.transactional.maxBatchesConcurrency"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
@@ -248,6 +250,8 @@ private[spark] object CosmosConfigNames {
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WriteBulkInitialBatchSize,
+ WriteBulkTransactionalMaxOperationsConcurrency,
+ WriteBulkTransactionalMaxBatchesConcurrency,
WriteBulkMaxBatchSize,
WriteBulkMinTargetBatchSize,
WritePointMaxConcurrency,
@@ -1464,21 +1468,31 @@ private case class CosmosPatchConfigs(columnConfigsMap: TrieMap[String, CosmosPa
private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
maxRetryCount: Int,
bulkEnabled: Boolean,
- bulkTransactional: Boolean = false,
+ bulkTransactional: Boolean,
+ bulkExecutionConfigs: Option[CosmosWriteBulkExecutionConfigsBase] = None,
bulkMaxPendingOperations: Option[Int] = None,
pointMaxConcurrency: Option[Int] = None,
- maxConcurrentCosmosPartitions: Option[Int] = None,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
- maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
- initialMicroBatchSize: Option[Int] = None,
- maxMicroBatchSize: Option[Int] = None,
- minTargetMicroBatchSize: Option[Int] = None,
flushCloseIntervalInSeconds: Int = 60,
maxInitialNoProgressIntervalInSeconds: Int = 180,
maxRetryNoProgressIntervalInSeconds: Int = 45 * 60,
retryCommitInterceptor: Option[WriteOnRetryCommitInterceptor] = None)
+private trait CosmosWriteBulkExecutionConfigsBase {}
+
+private case class CosmosWriteBulkExecutionConfigs(
+ maxConcurrentCosmosPartitions: Option[Int] = None,
+ maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
+ initialMicroBatchSize: Option[Int] = None,
+ maxMicroBatchSize: Option[Int] = None,
+ minTargetMicroBatchSize: Option[Int] = None) extends CosmosWriteBulkExecutionConfigsBase
+
+private case class CosmosWriteTransactionalBulkExecutionConfigs(
+ maxConcurrentCosmosPartitions: Option[Int] = None,
+ maxConcurrentOperations: Option[Int] = None,
+ maxConcurrentBatches: Option[Int] = None) extends CosmosWriteBulkExecutionConfigsBase
+
private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
private val DefaultPatchOperationType = CosmosPatchOperationTypes.Replace
@@ -1554,6 +1568,22 @@ private object CosmosWriteConfig {
s" repartitioned to balance to how many Cosmos partitions each Spark partition needs to write. This is mainly" +
s" useful for very large containers (with hundreds of physical partitions).")
+ private val bulkTransactionalMaxOpsConcurrency = CosmosConfigEntry[Int](
+ key = CosmosConfigNames.WriteBulkTransactionalMaxOperationsConcurrency,
+ defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_BULK_TRANSACTIONAL_BATCH_OP_CONCURRENCY),
+ mandatory = false,
+ parseFromStringFunction = maxOpsConcurrency => maxOpsConcurrency.toInt,
+ helpMessage = "Max number of in-flight operations per Cosmos partition for transactional bulk mode. " +
+ "Higher values increase parallelism (and RU usage) but can cause throttling; default ~100.")
+
+ private val bulkTransactionalMaxBatchesConcurrency = CosmosConfigEntry[Int](
+ key = CosmosConfigNames.WriteBulkTransactionalMaxBatchesConcurrency,
+ defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_BULK_TRANSACTIONAL_BATCH_CONCURRENCY),
+ mandatory = false,
+ parseFromStringFunction = maxBatchesConcurrency => maxBatchesConcurrency.toInt,
+ helpMessage = "Max concurrent transactional batches per Cosmos partition (1..5). Controls batch-level parallelism; default 5." +
+ "Each batch may contain multiple operations; tune together with 'spark.cosmos.write.bulk.transactional.maxOperationsConcurrency' to balance throughput and throttling.")
+
private val pointWriteConcurrency = CosmosConfigEntry[Int](key = CosmosConfigNames.WritePointMaxConcurrency,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
@@ -1772,10 +1802,7 @@ private object CosmosWriteConfig {
val bulkTransactionalOpt = CosmosConfigEntry.parse(cfg, bulkTransactional)
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
- val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
- val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
- val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)
- val minTargetBatchSizeOpt = CosmosConfigEntry.parse(cfg, minTargetMicroBatchSize)
+
val writeRetryCommitInterceptor = CosmosConfigEntry
.parse(cfg, writeOnRetryCommitInterceptor).flatten
@@ -1785,12 +1812,6 @@ private object CosmosWriteConfig {
assert(itemWriteStrategyOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteStrategy}' is missing.")
assert(maxRetryCountOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteMaxRetryCount}' is missing.")
- if (bulkTransactionalOpt.isDefined && bulkTransactionalOpt.get) {
- // Validate write strategy for transactional batches
- assert(itemWriteStrategyOpt.get == ItemWriteStrategy.ItemOverwrite,
- s"Transactional batches only support ItemOverwrite (upsert) write strategy. Requested: ${itemWriteStrategyOpt.get}")
- }
-
itemWriteStrategyOpt.get match {
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists =>
val patchColumnConfigMap = parsePatchColumnConfigs(cfg, inputSchema)
@@ -1802,20 +1823,51 @@ private object CosmosWriteConfig {
case _ =>
}
+ var bulkExecutionConfigsOpt: Option[CosmosWriteBulkExecutionConfigsBase] = None
+ if (bulkEnabledOpt.isDefined && bulkEnabledOpt.get) {
+
+ if (bulkTransactionalOpt.isDefined && bulkTransactionalOpt.get) {
+ // Validate write strategy for transactional batches
+ assert(itemWriteStrategyOpt.get == ItemWriteStrategy.ItemOverwrite,
+ s"Transactional batches only support ItemOverwrite (upsert) write strategy. Requested: ${itemWriteStrategyOpt.get}")
+
+ val maxConcurrentCosmosPartitionsOpt = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions)
+ val maxBulkTransactionalOpsConcurrencyOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMaxOpsConcurrency)
+ val maxBulkTransactionalBatchesConcurrencyOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMaxBatchesConcurrency)
+
+ bulkExecutionConfigsOpt = Some(CosmosWriteTransactionalBulkExecutionConfigs(
+ maxConcurrentCosmosPartitionsOpt,
+ maxBulkTransactionalOpsConcurrencyOpt,
+ maxBulkTransactionalBatchesConcurrencyOpt
+ ))
+
+ } else {
+ // non-transactional batch
+ val maxConcurrentCosmosPartitionsOpt = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions)
+ val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
+ val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
+ val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)
+ val minTargetBatchSizeOpt = CosmosConfigEntry.parse(cfg, minTargetMicroBatchSize)
+
+ bulkExecutionConfigsOpt = Some(CosmosWriteBulkExecutionConfigs(
+ maxConcurrentCosmosPartitionsOpt,
+ microBatchPayloadSizeInBytesOpt,
+ initialBatchSizeOpt,
+ maxBatchSizeOpt,
+ minTargetBatchSizeOpt))
+ }
+ }
+
CosmosWriteConfig(
itemWriteStrategyOpt.get,
maxRetryCountOpt.get,
bulkEnabled = bulkEnabledOpt.get,
bulkTransactional = bulkTransactionalOpt.get,
+ bulkExecutionConfigs = bulkExecutionConfigsOpt,
bulkMaxPendingOperations = CosmosConfigEntry.parse(cfg, bulkMaxPendingOperations),
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
- maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt,
- maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt,
- initialMicroBatchSize = initialBatchSizeOpt,
- maxMicroBatchSize = maxBatchSizeOpt,
- minTargetMicroBatchSize = minTargetBatchSizeOpt,
flushCloseIntervalInSeconds = CosmosConfigEntry.parse(cfg, flushCloseIntervalInSeconds).get,
maxInitialNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxInitialNoProgressIntervalInSeconds).get,
maxRetryNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxRetryNoProgressIntervalInSeconds).get,
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
index f50c838c4dcf..13b47fe87a48 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
@@ -3,12 +3,13 @@
package com.azure.cosmos.spark
// scalastyle:off underscore.import
-import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnosticsContext, CosmosEndToEndOperationLatencyPolicyConfigBuilder, CosmosException}
-import com.azure.cosmos.implementation.batch.{BulkExecutorDiagnosticsTracker, TransactionalBulkExecutor}
+import com.azure.cosmos.implementation.batch.{BulkExecutorDiagnosticsTracker, CosmosBulkTransactionalBatchResponse, TransactionalBulkExecutor}
import com.azure.cosmos.implementation.{CosmosTransactionalBulkExecutionOptionsImpl, UUIDs}
import com.azure.cosmos.models.{CosmosBatch, CosmosBatchResponse}
-import com.azure.cosmos.spark.TransactionalBulkWriter.{BulkOperationFailedException, DefaultMaxPendingOperationPerCore, emitFailureHandler, getThreadInfo, transactionalBatchInputBoundedElastic, transactionalBulkWriterInputBoundedElastic, transactionalBulkWriterRequestsBoundedElastic}
+import com.azure.cosmos.spark.BulkWriter.getThreadInfo
+import com.azure.cosmos.spark.TransactionalBulkWriter.{BulkOperationFailedException, DefaultMaxPendingOperationPerCore, emitFailureHandler, transactionalBatchInputBoundedElastic, transactionalBulkWriterInputBoundedElastic, transactionalBulkWriterRequestsBoundedElastic}
import com.azure.cosmos.spark.diagnostics.DefaultDiagnostics
+import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnosticsContext, CosmosEndToEndOperationLatencyPolicyConfigBuilder, CosmosException}
import reactor.core.Scannable
import reactor.core.scala.publisher.SMono.PimpJFlux
import reactor.core.scheduler.Scheduler
@@ -68,7 +69,9 @@ private class TransactionalBulkWriter
}
private val maxPendingOperations = writeConfig.bulkMaxPendingOperations
.getOrElse(DefaultMaxPendingOperationPerCore)
- private val maxConcurrentPartitions = writeConfig.maxConcurrentCosmosPartitions match {
+
+ private val transactionalBulkExecutionConfigs = writeConfig.bulkExecutionConfigs.get.asInstanceOf[CosmosWriteTransactionalBulkExecutionConfigs]
+ private val maxConcurrentPartitions = transactionalBulkExecutionConfigs.maxConcurrentCosmosPartitions match {
// using the provided maximum of concurrent partitions per Spark partition on the input data
// multiplied by 2 to leave space for partition splits during ingestion
case Some(configuredMaxConcurrentPartitions) => 2 * configuredMaxConcurrentPartitions
@@ -104,12 +107,18 @@ private class TransactionalBulkWriter
private val endToEndTimeoutPolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(maxOperationTimeout)
.enable(true)
.build
- private val cosmosTransactionalBulkExecutionOptions = new CosmosTransactionalBulkExecutionOptionsImpl(Map.empty[String, String].asJava)
+ private val cosmosTransactionalBulkExecutionOptions = new CosmosTransactionalBulkExecutionOptionsImpl()
private val monotonicOperationCounter = new AtomicLong(0)
cosmosTransactionalBulkExecutionOptions.setSchedulerOverride(transactionalBulkWriterRequestsBoundedElastic)
cosmosTransactionalBulkExecutionOptions.setMaxConcurrentCosmosPartitions(maxConcurrentPartitions)
cosmosTransactionalBulkExecutionOptions.setCosmosEndToEndLatencyPolicyConfig(endToEndTimeoutPolicy)
+ if (transactionalBulkExecutionConfigs.maxConcurrentOperations.isDefined) {
+ cosmosTransactionalBulkExecutionOptions.setMaxOperationsConcurrency(transactionalBulkExecutionConfigs.maxConcurrentOperations.get)
+ }
+ if (transactionalBulkExecutionConfigs.maxConcurrentBatches.isDefined) {
+ cosmosTransactionalBulkExecutionOptions.setMaxBatchesConcurrency(transactionalBulkExecutionConfigs.maxConcurrentBatches.get)
+ }
private class ForwardingMetricTracker(val verboseLoggingEnabled: AtomicBoolean) extends BulkExecutorDiagnosticsTracker {
override def trackDiagnostics(ctx: CosmosDiagnosticsContext): Unit = {
@@ -245,12 +254,12 @@ private class TransactionalBulkWriter
log.logError(s"Batch input publishing flux failed, Context: ${operationContext.toString} $getThreadInfo", t)
})
- val transactionalExecutor = new TransactionalBulkExecutor[Object](
+ val transactionalExecutor = new TransactionalBulkExecutor(
container,
batchInputFlux,
cosmosTransactionalBulkExecutionOptions)
- val batchResponseFlux: SFlux[CosmosBatchResponse] = transactionalExecutor.execute().asScala
+ val batchResponseFlux: SFlux[CosmosBulkTransactionalBatchResponse] = transactionalExecutor.execute().asScala
batchResponseFlux.subscribe(
resp => {
@@ -259,7 +268,7 @@ private class TransactionalBulkWriter
try {
// all the operations in the batch will have the same partition key value
// get the partition key value from the first result
- val partitionKeyValue = resp.getResults.get(0).getOperation.getPartitionKeyValue
+ val partitionKeyValue = resp.getCosmosBatch.getPartitionKeyValue
val activeBatchOperationOpt = activeBatches.remove(partitionKeyValue)
val pendingBatchOperationRetriesOpt = pendingBatchRetries.remove(partitionKeyValue)
@@ -277,17 +286,35 @@ private class TransactionalBulkWriter
if (activeBatchOperationOpt.isDefined || pendingBatchOperationRetriesOpt.isDefined) {
val batchOperation = activeBatchOperationOpt.orElse(pendingBatchOperationRetriesOpt).get
- if (isSuccessStatusCode(resp.getStatusCode)) {
- // no error cases
- outputMetricsPublisher.trackWriteOperation(resp.size(), None) // TODO[Annie]:verify the diagnostics
- totalSuccessfulIngestionMetrics.addAndGet(resp.size())
- } else {
+ if (resp.getException != null) {
+ Option(resp.getException) match {
+ case Some(cosmosException: CosmosException) =>
+ handleNonSuccessfulStatusCode(
+ batchOperation.operationContext,
+ batchOperation.cosmosBatch,
+ None,
+ isGettingRetried,
+ Some(cosmosException))
+ case _ =>
+ log.logWarning(
+ s"unexpected failure: partitionKeyValue=[" +
+ s"${batchOperation.operationContext}], encountered , attemptNumber=${batchOperation.operationContext.attemptNumber}, " +
+ s"exceptionMessage=${resp.getException.getMessage}, " +
+ s"Context: ${operationContext.toString} $getThreadInfo", resp.getException)
+ captureIfFirstFailure(resp.getException)
+ cancelWork()
+ }
+ } else if (!resp.getResponse.isSuccessStatusCode) {
handleNonSuccessfulStatusCode(
batchOperation.operationContext,
batchOperation.cosmosBatch,
- resp,
+ Some(resp.getResponse),
isGettingRetried,
None)
+ } else {
+ // no error case
+ outputMetricsPublisher.trackWriteOperation(resp.getResponse.size(), None)
+ totalSuccessfulIngestionMetrics.addAndGet(resp.getResponse.size())
}
}
}
@@ -319,8 +346,6 @@ private class TransactionalBulkWriter
)
}
- def isSuccessStatusCode(statusCode: Int): Boolean = 200 <= statusCode && statusCode <= 299
-
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode): Unit = {
Preconditions.checkState(!closed.get())
throwIfCapturedExceptionExists()
@@ -400,13 +425,34 @@ private class TransactionalBulkWriter
(
operationContext: OperationContext,
cosmosBatch: CosmosBatch,
- cosmosBatchResponse: CosmosBatchResponse,
+ cosmosBatchResponse: Option[CosmosBatchResponse],
isGettingRetried: AtomicBoolean,
responseException: Option[CosmosException]
) : Unit = {
- val effectiveStatusCode = cosmosBatchResponse.getStatusCode
- val effectiveSubStatusCode = cosmosBatchResponse.getSubStatusCode
+ val exceptionMessage = cosmosBatchResponse match {
+ case Some(r) => r.getErrorMessage
+ case None => responseException match {
+ case Some(e) => e.getMessage
+ case None => ""
+ }
+ }
+
+ val effectiveStatusCode = cosmosBatchResponse match {
+ case Some(r) => r.getStatusCode
+ case None => responseException match {
+ case Some(e) => e.getStatusCode
+ case None => CosmosConstants.StatusCodes.Timeout
+ }
+ }
+
+ val effectiveSubStatusCode = cosmosBatchResponse match {
+ case Some(r) => r.getSubStatusCode
+ case None => responseException match {
+ case Some(e) => e.getSubStatusCode
+ case None => 0
+ }
+ }
log.logDebug(s"encountered batch operation response with status code " +
s"$effectiveStatusCode:$effectiveSubStatusCode, " +
@@ -416,7 +462,7 @@ private class TransactionalBulkWriter
// requeue
log.logWarning(s"for partitionKeyValue=[${operationContext.partitionKeyValueInput}], " +
s"encountered status code '$effectiveStatusCode:$effectiveSubStatusCode', will retry! " +
- s"attemptNumber=${operationContext.attemptNumber}, exceptionMessage=${cosmosBatchResponse.getErrorMessage}, " +
+ s"attemptNumber=${operationContext.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
s"Context: {${operationContext.toString}} $getThreadInfo")
val batchOperationRetry = CosmosBatchOperation(
@@ -428,7 +474,7 @@ private class TransactionalBulkWriter
)
this.scheduleRetry(
- trackPendingRetryAction = () => pendingBatchRetries.put(cosmosBatch.getPartitionKeyValue, batchOperationRetry).isDefined,
+ trackPendingRetryAction = () => pendingBatchRetries.put(cosmosBatch.getPartitionKeyValue, batchOperationRetry).isEmpty,
clearPendingRetryAction = () => pendingBatchRetries.remove(cosmosBatch.getPartitionKeyValue).isDefined,
batchOperationRetry,
effectiveStatusCode)
@@ -437,7 +483,7 @@ private class TransactionalBulkWriter
} else {
log.logError(s"for partitionKeyValue=[${operationContext.partitionKeyValueInput}], " +
s"encountered status code '$effectiveStatusCode:$effectiveSubStatusCode', all retries exhausted! " +
- s"attemptNumber=${operationContext.attemptNumber}, exceptionMessage=${cosmosBatchResponse.getErrorMessage}, " +
+ s"attemptNumber=${operationContext.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
s"Context: {${operationContext.toString} $getThreadInfo")
val message = s"All retries exhausted for batch operation - " +
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala
index 9ca7f459cf15..43538a033c36 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala
@@ -32,7 +32,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemOverwrite,
+ 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val metricsPublisher = new TestOutputMetricsPublisher
val bulkWriter = new BulkWriter(
@@ -74,7 +80,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemOverwrite,
+ 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val bulkWriter = new BulkWriter(
container,
@@ -123,7 +135,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
try {
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemOverwrite,
+ 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val bulkWriter = new BulkWriter(
container,
@@ -190,7 +208,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
try {
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemAppend, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemAppend,
+ 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val metricsPublisher = new TestOutputMetricsPublisher
val bulkWriter = new BulkWriter(
@@ -247,7 +271,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemOverwrite,
+ 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val metricsPublisher = new TestOutputMetricsPublisher
val bulkWriter = new BulkWriter(
@@ -285,6 +315,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemDelete,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900))
val bulkDeleter = new BulkWriter(
@@ -316,7 +348,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemOverwrite,
+ 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val bulkWriter = new BulkWriter(
container,
@@ -382,6 +420,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemDeleteIfNotModified,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900))
val bulkDeleter = new BulkWriter(
@@ -408,7 +448,13 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val containerConfig = CosmosContainerConfig(container.getDatabase.getId, container.getId, None)
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemAppend, maxRetryCount = 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900))
+ val writeConfig = CosmosWriteConfig(
+ ItemWriteStrategy.ItemAppend,
+ maxRetryCount = 5,
+ bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
+ bulkMaxPendingOperations = Some(900))
val bulkWriter = new BulkWriter(
container,
containerConfig,
@@ -448,6 +494,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwriteIfNotModified,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -572,6 +620,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -702,6 +752,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -782,6 +834,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -866,6 +920,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -927,6 +983,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1005,6 +1063,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1067,6 +1127,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1144,6 +1206,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1206,6 +1270,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1315,6 +1381,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900),
patchConfigs = Some(CosmosPatchConfigs(new TrieMap[String, CosmosPatchColumnConfig]())))
@@ -1362,6 +1430,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900),
patchConfigs = Some(CosmosPatchConfigs(new TrieMap[String, CosmosPatchColumnConfig]())))
@@ -1410,6 +1480,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1491,6 +1563,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1548,6 +1622,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -1613,6 +1689,8 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala
index 0144b468582b..0b580fd7e367 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala
@@ -728,6 +728,26 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
config.maxRetryNoProgressIntervalInSeconds shouldEqual 45 * 60
}
+ it should "parse transactional bulk write configs" in {
+ val userConfig = Map(
+ "spark.cosmos.write.strategy" -> "ItemOverwrite",
+ "spark.cosmos.write.bulk.enabled" -> "true",
+ "spark.cosmos.write.bulk.transactional" -> "true",
+ "spark.cosmos.write.bulk.transactional.maxOperationsConcurrency" -> "123",
+ "spark.cosmos.write.bulk.transactional.maxBatchesConcurrency" -> "5"
+ )
+
+ val config = CosmosWriteConfig.parseWriteConfig(userConfig, StructType(Nil))
+
+ config.bulkTransactional shouldEqual true
+ config.bulkExecutionConfigs.isDefined shouldEqual true
+ val txConfigs = config.bulkExecutionConfigs.get.asInstanceOf[CosmosWriteTransactionalBulkExecutionConfigs]
+ txConfigs.maxConcurrentOperations.isDefined shouldEqual true
+ txConfigs.maxConcurrentOperations.get shouldEqual 123
+ txConfigs.maxConcurrentBatches.isDefined shouldEqual true
+ txConfigs.maxConcurrentBatches.get shouldEqual 5
+ }
+
it should "parse partitioning config with custom Strategy" in {
val partitioningConfig = Map(
"spark.cosmos.read.partitioning.strategy" -> "Custom",
@@ -976,9 +996,11 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
)
var writeConfig: CosmosWriteConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
- writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
- writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
- writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES
+ writeConfig.bulkExecutionConfigs should not be null
+ var bulkExecutorConfigs = writeConfig.bulkExecutionConfigs.get.asInstanceOf[CosmosWriteBulkExecutionConfigs]
+ bulkExecutorConfigs.maxMicroBatchPayloadSizeInBytes should not be null
+ bulkExecutorConfigs.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
+ bulkExecutorConfigs.maxMicroBatchPayloadSizeInBytes.get shouldEqual BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES
userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
@@ -988,9 +1010,11 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
writeConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
- writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
- writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
- writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual 1000000
+ writeConfig.bulkExecutionConfigs should not be null
+ bulkExecutorConfigs = writeConfig.bulkExecutionConfigs.get.asInstanceOf[CosmosWriteBulkExecutionConfigs]
+ bulkExecutorConfigs.maxMicroBatchPayloadSizeInBytes should not be null
+ bulkExecutorConfigs.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
+ bulkExecutorConfigs.maxMicroBatchPayloadSizeInBytes.get shouldEqual 1000000
}
"Config Parser" should "validate default operation types for patch configs" in {
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterITest.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterITest.scala
index ba92f33fc307..2fc74fbfc4d5 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterITest.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterITest.scala
@@ -31,7 +31,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val metricsPublisher = new TestOutputMetricsPublisher
val pointWriter = new PointWriter(
@@ -73,7 +73,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val metricsPublisher = new TestOutputMetricsPublisher
val pointWriter = new PointWriter(
@@ -107,7 +107,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
secondObjectNodeHasAllFieldsOfFirstObjectNode(expectedItem, itemFromDB) shouldEqual true
}
- val deleteConfig = CosmosWriteConfig(ItemWriteStrategy.ItemDelete, maxRetryCount = 3, bulkEnabled = false)
+ val deleteConfig = CosmosWriteConfig(ItemWriteStrategy.ItemDelete, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val pointDeleter = new PointWriter(
container,
@@ -133,7 +133,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -192,7 +192,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val deleteConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemDeleteIfNotModified,
maxRetryCount = 3,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointDeleter = new PointWriter(
container,
@@ -217,7 +218,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val container = getContainer
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemAppend, maxRetryCount = 0, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemAppend, maxRetryCount = 0, bulkEnabled = false, bulkTransactional = false)
val pointWriter = new PointWriter(
container,
partitionKeyDefinition,
@@ -253,7 +254,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val writeConfig = CosmosWriteConfig(
- ItemWriteStrategy.ItemOverwriteIfNotModified, maxRetryCount = 3, bulkEnabled = false)
+ ItemWriteStrategy.ItemOverwriteIfNotModified, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val metricsPublisher = new TestOutputMetricsPublisher
var pointWriter = new PointWriter(
@@ -372,7 +373,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val metricsPublisher = new TestOutputMetricsPublisher
val pointWriter = new PointWriter(
@@ -501,7 +503,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -578,7 +581,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -637,7 +641,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -718,7 +723,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -794,6 +800,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -860,7 +868,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -964,6 +973,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = false,
+ bulkTransactional = false,
bulkMaxPendingOperations = Some(900),
patchConfigs = Some(CosmosPatchConfigs(new TrieMap[String, CosmosPatchColumnConfig]())))
@@ -1011,6 +1021,7 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = false,
+ bulkTransactional = false,
bulkMaxPendingOperations = Some(900),
patchConfigs = Some(CosmosPatchConfigs(new TrieMap[String, CosmosPatchColumnConfig]))
)
@@ -1059,7 +1070,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -1132,7 +1144,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -1179,7 +1192,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -1238,7 +1252,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container,
@@ -1314,6 +1329,8 @@ class PointWriterITest extends IntegrationSpec with CosmosClient with AutoCleana
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterSubpartitionITest.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterSubpartitionITest.scala
index a1d8073f296d..5ada2ac957c3 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterSubpartitionITest.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterSubpartitionITest.scala
@@ -32,7 +32,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -67,7 +67,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -96,7 +96,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
secondObjectNodeHasAllFieldsOfFirstObjectNode(expectedItem, itemFromDB) shouldEqual true
}
- val deleteConfig = CosmosWriteConfig(ItemWriteStrategy.ItemDelete, maxRetryCount = 3, bulkEnabled = false)
+ val deleteConfig = CosmosWriteConfig(ItemWriteStrategy.ItemDelete, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val pointDeleter = new PointWriter(
container, partitionKeyDefinition, deleteConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -122,7 +122,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -181,7 +181,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val deleteConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemDeleteIfNotModified,
maxRetryCount = 3,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointDeleter = new PointWriter(
container, partitionKeyDefinition, deleteConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -206,7 +207,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val container = getContainer
val containerProperties = container.read().block().getProperties
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
- val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemAppend, maxRetryCount = 0, bulkEnabled = false)
+ val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemAppend, maxRetryCount = 0, bulkEnabled = false, bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
val items = new mutable.HashMap[String, mutable.Set[ObjectNode]] with mutable.MultiMap[String, ObjectNode]
@@ -242,7 +243,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
val writeConfig = CosmosWriteConfig(
- ItemWriteStrategy.ItemOverwriteIfNotModified, maxRetryCount = 3, bulkEnabled = false)
+ ItemWriteStrategy.ItemOverwriteIfNotModified, maxRetryCount = 3, bulkEnabled = false, bulkTransactional = false)
var pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -346,7 +347,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -446,7 +448,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -525,7 +528,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -596,7 +600,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -675,7 +680,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -759,6 +765,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
@@ -821,7 +829,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -930,6 +939,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = false,
+ bulkTransactional = false,
bulkMaxPendingOperations = Some(900),
patchConfigs = Some(CosmosPatchConfigs(new TrieMap[String, CosmosPatchColumnConfig]())))
@@ -975,6 +985,7 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = false,
+ bulkTransactional = false,
bulkMaxPendingOperations = Some(900),
patchConfigs = Some(CosmosPatchConfigs(new TrieMap[String, CosmosPatchColumnConfig]))
)
@@ -1034,7 +1045,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -1105,7 +1117,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -1151,7 +1164,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -1211,7 +1225,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
val writeConfig = CosmosWriteConfig(
ItemWriteStrategy.ItemOverwrite,
5,
- bulkEnabled = false)
+ bulkEnabled = false,
+ bulkTransactional = false)
val pointWriter = new PointWriter(
container, partitionKeyDefinition, writeConfig, DiagnosticsConfig(), MockTaskContext.mockTaskContext(),new TestOutputMetricsPublisher)
@@ -1292,6 +1307,8 @@ class PointWriterSubpartitionITest extends IntegrationSpec with CosmosClient wit
ItemWriteStrategy.ItemOverwrite,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
bulkMaxPendingOperations = Some(900)
)
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2EBulkWriteITest.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2EBulkWriteITest.scala
index 5b63b6a236a8..778136d375cf 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2EBulkWriteITest.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2EBulkWriteITest.scala
@@ -31,157 +31,161 @@ class SparkE2EBulkWriteITest
//scalastyle:off magic.number
//scalastyle:off null
- it should s"support bulk ingestion when BulkWriter needs to get restarted" in {
- val cosmosEndpoint = TestConfigurations.HOST
- val cosmosMasterKey = TestConfigurations.MASTER_KEY
-
- val configMapBuilder = scala.collection.mutable.Map(
- "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
- "spark.cosmos.accountKey" -> cosmosMasterKey,
- "spark.cosmos.database" -> cosmosDatabase,
- "spark.cosmos.container" -> cosmosContainer,
- "spark.cosmos.serialization.inclusionMode" -> "NonDefault"
- )
-
- var faultInjectionRuleOption : Option[FaultInjectionRule] = None
-
- try {
- // set-up logging
- val logs = scala.collection.mutable.ListBuffer[CosmosDiagnosticsContext]()
-
- configMapBuilder += "spark.cosmos.account.clientBuilderInterceptors" -> "com.azure.cosmos.spark.TestCosmosClientBuilderInterceptor"
- TestCosmosClientBuilderInterceptor.setCallback(builder => {
- val thresholds = new CosmosDiagnosticsThresholds()
- .setPointOperationLatencyThreshold(Duration.ZERO)
- .setNonPointOperationLatencyThreshold(Duration.ZERO)
- val telemetryCfg = new CosmosClientTelemetryConfig()
- .showQueryMode(ShowQueryMode.ALL)
- .diagnosticsHandler(new CompositeLoggingHandler(logs))
- .diagnosticsThresholds(thresholds)
- builder.clientTelemetryConfig(telemetryCfg)
- })
-
- // set-up fault injection
- configMapBuilder += "spark.cosmos.account.clientInterceptors" -> "com.azure.cosmos.spark.TestFaultInjectionClientInterceptor"
- configMapBuilder += "spark.cosmos.write.flush.intervalInSeconds" -> "10"
- configMapBuilder += "spark.cosmos.write.flush.noProgress.maxIntervalInSeconds" -> "30"
- configMapBuilder += "spark.cosmos.write.flush.noProgress.maxRetryIntervalInSeconds" -> "300"
- configMapBuilder += "spark.cosmos.write.onRetryCommitInterceptor" -> "com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor"
- TestFaultInjectionClientInterceptor.setCallback(client => {
- val faultInjectionResultBuilder = FaultInjectionResultBuilders
- .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY)
- .delay(Duration.ofHours(10000))
- .times(1)
-
- val endpoints = new FaultInjectionEndpointBuilder(
- FeedRange.forLogicalPartition(new PartitionKey("range_1")))
- .build()
-
- val result = faultInjectionResultBuilder.build
- val condition = new FaultInjectionConditionBuilder()
- .operationType(FaultInjectionOperationType.BATCH_ITEM)
- .connectionType(FaultInjectionConnectionType.DIRECT)
- .endpoints(endpoints)
- .build
-
- faultInjectionRuleOption = Some(new FaultInjectionRuleBuilder("InjectedEndlessResponseDelay")
- .condition(condition)
- .result(result)
- .build)
-
- TestWriteOnRetryCommitInterceptor.setCallback(() => faultInjectionRuleOption.get.disable())
-
- CosmosFaultInjectionHelper.configureFaultInjectionRules(
- client.getDatabase(cosmosDatabase).getContainer(cosmosContainer),
- List(faultInjectionRuleOption.get).asJava).block
-
- client
- })
-
- val cfg = configMapBuilder.toMap
-
- val newSpark = getSpark
-
- // scalastyle:off underscore.import
- // scalastyle:off import.grouping
- import spark.implicits._
- val spark = newSpark
- // scalastyle:on underscore.import
- // scalastyle:on import.grouping
-
- val toBeIngested = scala.collection.mutable.ListBuffer[String]()
- for (i <- 1 to 100) {
- toBeIngested += s"record_$i"
- }
-
- val df = toBeIngested.toSeq.toDF("id")
-
- var bytesWrittenSnapshot = 0L
- var recordsWrittenSnapshot = 0L
- var totalRequestChargeSnapshot: Option[AccumulableInfo] = None
-
- val statusStore = spark.sharedState.statusStore
- val oldCount = statusStore.executionsCount()
-
- spark.sparkContext
- .addSparkListener(
- new SparkListener {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
- val outputMetrics = taskEnd.taskMetrics.outputMetrics
- logInfo(s"ON_TASK_END - Records written: ${outputMetrics.recordsWritten}, " +
- s"Bytes written: ${outputMetrics.bytesWritten}, " +
- s"${taskEnd.taskInfo.accumulables.mkString(", ")}")
- bytesWrittenSnapshot = outputMetrics.bytesWritten
-
- recordsWrittenSnapshot = outputMetrics.recordsWritten
-
- taskEnd
- .taskInfo
- .accumulables
- .filter(accumulableInfo => accumulableInfo.name.isDefined &&
- accumulableInfo.name.get.equals(CosmosConstants.MetricNames.TotalRequestCharge))
- .foreach(
- accumulableInfo => {
- totalRequestChargeSnapshot = Some(accumulableInfo)
- }
- )
- }
- })
-
- df.write.format("cosmos.oltp").mode("Append").options(cfg).save()
-
- // Wait until the new execution is started and being tracked.
- eventually(timeout(10.seconds), interval(10.milliseconds)) {
- assert(statusStore.executionsCount() > oldCount)
- }
-
- // Wait for listener to finish computing the metrics for the execution.
- eventually(timeout(10.seconds), interval(10.milliseconds)) {
- assert(statusStore.executionsList().nonEmpty &&
- statusStore.executionsList().last.metricValues != null)
- }
-
- recordsWrittenSnapshot shouldEqual 100
- bytesWrittenSnapshot > 0 shouldEqual true
-
- // that the write by spark is visible by the client query
- // wait for a second to allow replication is completed.
- Thread.sleep(1000)
-
- // the new item will be always persisted
- val ids = queryItems("SELECT c.id FROM c ORDER by c.id").toArray
- ids should have size 100
- val firstDoc = ids(0)
- firstDoc.get("id").asText() shouldEqual "record_1"
-
- // validate logs
- logs.nonEmpty shouldEqual true
- } finally {
- TestCosmosClientBuilderInterceptor.resetCallback()
- TestFaultInjectionClientInterceptor.resetCallback()
- faultInjectionRuleOption match {
- case Some(rule) => rule.disable()
- case None =>
+ for (enableBulkTransactional <- Seq(true, false)) {
+ it should s"support bulk ingestion when BulkWriter needs to get restarted with transactional bulk enabled $enableBulkTransactional" in {
+
+ val cosmosEndpoint = TestConfigurations.HOST
+ val cosmosMasterKey = TestConfigurations.MASTER_KEY
+
+ val configMapBuilder = scala.collection.mutable.Map(
+ "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
+ "spark.cosmos.accountKey" -> cosmosMasterKey,
+ "spark.cosmos.database" -> cosmosDatabase,
+ "spark.cosmos.container" -> cosmosContainer,
+ "spark.cosmos.serialization.inclusionMode" -> "NonDefault",
+ "spark.cosmos.write.bulk.transactional" -> enableBulkTransactional.toString
+ )
+
+ var faultInjectionRuleOption : Option[FaultInjectionRule] = None
+
+ try {
+ // set-up logging
+ val logs = scala.collection.mutable.ListBuffer[CosmosDiagnosticsContext]()
+
+ configMapBuilder += "spark.cosmos.account.clientBuilderInterceptors" -> "com.azure.cosmos.spark.TestCosmosClientBuilderInterceptor"
+ TestCosmosClientBuilderInterceptor.setCallback(builder => {
+ val thresholds = new CosmosDiagnosticsThresholds()
+ .setPointOperationLatencyThreshold(Duration.ZERO)
+ .setNonPointOperationLatencyThreshold(Duration.ZERO)
+ val telemetryCfg = new CosmosClientTelemetryConfig()
+ .showQueryMode(ShowQueryMode.ALL)
+ .diagnosticsHandler(new CompositeLoggingHandler(logs))
+ .diagnosticsThresholds(thresholds)
+ builder.clientTelemetryConfig(telemetryCfg)
+ })
+
+ // set-up fault injection
+ configMapBuilder += "spark.cosmos.account.clientInterceptors" -> "com.azure.cosmos.spark.TestFaultInjectionClientInterceptor"
+ configMapBuilder += "spark.cosmos.write.flush.intervalInSeconds" -> "10"
+ configMapBuilder += "spark.cosmos.write.flush.noProgress.maxIntervalInSeconds" -> "30"
+ configMapBuilder += "spark.cosmos.write.flush.noProgress.maxRetryIntervalInSeconds" -> "300"
+ configMapBuilder += "spark.cosmos.write.onRetryCommitInterceptor" -> "com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor"
+ TestFaultInjectionClientInterceptor.setCallback(client => {
+ val faultInjectionResultBuilder = FaultInjectionResultBuilders
+ .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY)
+ .delay(Duration.ofHours(10000))
+ .times(1)
+
+ val endpoints = new FaultInjectionEndpointBuilder(
+ FeedRange.forLogicalPartition(new PartitionKey("range_1")))
+ .build()
+
+ val result = faultInjectionResultBuilder.build
+ val condition = new FaultInjectionConditionBuilder()
+ .operationType(FaultInjectionOperationType.BATCH_ITEM)
+ .connectionType(FaultInjectionConnectionType.DIRECT)
+ .endpoints(endpoints)
+ .build
+
+ faultInjectionRuleOption = Some(new FaultInjectionRuleBuilder("InjectedEndlessResponseDelay")
+ .condition(condition)
+ .result(result)
+ .build)
+
+ TestWriteOnRetryCommitInterceptor.setCallback(() => faultInjectionRuleOption.get.disable())
+
+ CosmosFaultInjectionHelper.configureFaultInjectionRules(
+ client.getDatabase(cosmosDatabase).getContainer(cosmosContainer),
+ List(faultInjectionRuleOption.get).asJava).block
+
+ client
+ })
+
+ val cfg = configMapBuilder.toMap
+
+ val newSpark = getSpark
+
+ // scalastyle:off underscore.import
+ // scalastyle:off import.grouping
+ import spark.implicits._
+ val spark = newSpark
+ // scalastyle:on underscore.import
+ // scalastyle:on import.grouping
+
+ val toBeIngested = scala.collection.mutable.ListBuffer[String]()
+ for (i <- 1 to 100) {
+ toBeIngested += s"record_$i"
+ }
+
+ val df = toBeIngested.toSeq.toDF("id")
+
+ var bytesWrittenSnapshot = 0L
+ var recordsWrittenSnapshot = 0L
+ var totalRequestChargeSnapshot: Option[AccumulableInfo] = None
+
+ val statusStore = spark.sharedState.statusStore
+ val oldCount = statusStore.executionsCount()
+
+ spark.sparkContext
+ .addSparkListener(
+ new SparkListener {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ val outputMetrics = taskEnd.taskMetrics.outputMetrics
+ logInfo(s"ON_TASK_END - Records written: ${outputMetrics.recordsWritten}, " +
+ s"Bytes written: ${outputMetrics.bytesWritten}, " +
+ s"${taskEnd.taskInfo.accumulables.mkString(", ")}")
+ bytesWrittenSnapshot = outputMetrics.bytesWritten
+
+ recordsWrittenSnapshot = outputMetrics.recordsWritten
+
+ taskEnd
+ .taskInfo
+ .accumulables
+ .filter(accumulableInfo => accumulableInfo.name.isDefined &&
+ accumulableInfo.name.get.equals(CosmosConstants.MetricNames.TotalRequestCharge))
+ .foreach(
+ accumulableInfo => {
+ totalRequestChargeSnapshot = Some(accumulableInfo)
+ }
+ )
+ }
+ })
+
+ df.write.format("cosmos.oltp").mode("Append").options(cfg).save()
+
+ // Wait until the new execution is started and being tracked.
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
+ assert(statusStore.executionsCount() > oldCount)
+ }
+
+ // Wait for listener to finish computing the metrics for the execution.
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
+ assert(statusStore.executionsList().nonEmpty &&
+ statusStore.executionsList().last.metricValues != null)
+ }
+
+ recordsWrittenSnapshot shouldEqual 100
+ bytesWrittenSnapshot > 0 shouldEqual true
+
+ // that the write by spark is visible by the client query
+ // wait for a second to allow replication is completed.
+ Thread.sleep(1000)
+
+ // the new item will be always persisted
+ val ids = queryItems("SELECT c.id FROM c ORDER by c.id").toArray
+ ids should have size 100
+ val firstDoc = ids(0)
+ firstDoc.get("id").asText() shouldEqual "record_1"
+
+ // validate logs
+ logs.nonEmpty shouldEqual true
+ } finally {
+ TestCosmosClientBuilderInterceptor.resetCallback()
+ TestFaultInjectionClientInterceptor.resetCallback()
+ faultInjectionRuleOption match {
+ case Some(rule) => rule.disable()
+ case None =>
+ }
}
}
}
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala
index 3b9feae2fdbd..0905934bebba 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala
@@ -2,9 +2,9 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark
+import com.azure.cosmos.CosmosAsyncClient
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
import com.azure.cosmos.models.{PartitionKey, PartitionKeyBuilder}
-import com.azure.cosmos.{CosmosAsyncClient, CosmosException}
import com.azure.cosmos.test.faultinjection._
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.sql.types._
@@ -50,7 +50,7 @@ class TransactionalBatchITest extends IntegrationSpec
)
val operationsDf = spark.createDataFrame(batchOperations.asJava, schema)
-
+
// Execute transactional batch using bulk transactional mode
operationsDf.write
.format("cosmos.oltp")
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/utils/CosmosPatchTestHelper.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/utils/CosmosPatchTestHelper.scala
index 982791600b58..c6876bef3b1c 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/utils/CosmosPatchTestHelper.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/utils/CosmosPatchTestHelper.scala
@@ -6,7 +6,7 @@ package com.azure.cosmos.spark.utils
import com.azure.cosmos.CosmosAsyncContainer
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils
import com.azure.cosmos.models.PartitionKeyDefinition
-import com.azure.cosmos.spark.{BulkWriter, CosmosContainerConfig, CosmosPatchColumnConfig, CosmosPatchConfigs, CosmosWriteConfig, DiagnosticsConfig, ItemWriteStrategy, OutputMetricsPublisherTrait, PointWriter}
+import com.azure.cosmos.spark.{BulkWriter, CosmosContainerConfig, CosmosPatchColumnConfig, CosmosPatchConfigs, CosmosWriteBulkExecutionConfigs, CosmosWriteConfig, DiagnosticsConfig, ItemWriteStrategy, OutputMetricsPublisherTrait, PointWriter}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.commons.lang3.RandomUtils
@@ -160,6 +160,7 @@ def getPatchFullTestSchemaWithSubpartitions(): StructType = {
))
}
+ // TODO: wire up with transactional batch when patch is supported in transactional bulk writer
def getBulkWriterForPatch(columnConfigsMap: TrieMap[String, CosmosPatchColumnConfig],
container: CosmosAsyncContainer,
containerConfig: CosmosContainerConfig,
@@ -171,6 +172,8 @@ def getPatchFullTestSchemaWithSubpartitions(): StructType = {
ItemWriteStrategy.ItemPatch,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
patchConfigs = Some(patchConfigs))
new BulkWriter(
@@ -183,6 +186,7 @@ def getPatchFullTestSchemaWithSubpartitions(): StructType = {
1)
}
+ // TODO: wire up with transactional bulk writer when patchBulkUpdate is supported in transactional bulk writer
def getBulkWriterForPatchBulkUpdate(columnConfigsMap: TrieMap[String, CosmosPatchColumnConfig],
container: CosmosAsyncContainer,
containerConfig: CosmosContainerConfig,
@@ -193,6 +197,8 @@ def getPatchFullTestSchemaWithSubpartitions(): StructType = {
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = true,
+ bulkTransactional = false,
+ bulkExecutionConfigs = Some(CosmosWriteBulkExecutionConfigs()),
patchConfigs = Some(patchConfigs))
new BulkWriter(
@@ -216,6 +222,7 @@ def getPatchFullTestSchemaWithSubpartitions(): StructType = {
ItemWriteStrategy.ItemPatch,
5,
bulkEnabled = false,
+ bulkTransactional = false,
patchConfigs = Some(patchConfigs))
new PointWriter(
@@ -237,6 +244,7 @@ def getPatchFullTestSchemaWithSubpartitions(): StructType = {
ItemWriteStrategy.ItemBulkUpdate,
5,
bulkEnabled = false,
+ bulkTransactional = false,
patchConfigs = Some(patchConfigs))
new PointWriter(
diff --git a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
index e3cca105c9e2..4c3391dcb35d 100644
--- a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
@@ -4,6 +4,7 @@
#### Features Added
* Initial release of Spark 4.0 connector with Scala 2.13 support
+* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
#### Breaking Changes
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java
index 96d2969a8cb3..657cc22f6366 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java
@@ -70,6 +70,17 @@ public static Object[][] exceptionArgsProvider() {
};
}
+ @DataProvider(name = "requestRateTooLarge_batch_ArgProvider")
+ public static Object[][] requestRateTooLarge_batch_ArgProvider() {
+ return new Object[][]{
+ // OperationType, ResourceType, disableRetryForThrottledBatchRequest
+ { OperationType.Batch, ResourceType.Document, true },
+ { OperationType.Batch, ResourceType.Document, false },
+ { OperationType.Batch, ResourceType.DocumentCollection, true },
+ { OperationType.Read, ResourceType.Document, true }
+ };
+ }
+
@Test(groups = "unit", dataProvider = "requestRateTooLargeArgProvider")
public void requestRateTooLarge(
OperationType operationType,
@@ -94,7 +105,8 @@ public void requestRateTooLarge(
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
// Create throttling exception with retry delay
Map headers = new HashMap<>();
@@ -170,7 +182,8 @@ public void networkFailureOnRead() throws Exception {
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = new SocketException("Dummy SocketException");
CosmosException cosmosException = BridgeInternal.createCosmosException(null, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, exception);
@@ -222,7 +235,8 @@ public void shouldRetryOnGatewayTimeout(
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = ReadTimeoutException.INSTANCE;
CosmosException cosmosException = BridgeInternal.createCosmosException(null, HttpConstants.StatusCodes.REQUEST_TIMEOUT, exception);
@@ -269,7 +283,8 @@ public void tcpNetworkFailureOnRead() throws Exception {
retryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = ReadTimeoutException.INSTANCE;
GoneException goneException = new GoneException(exception);
@@ -326,7 +341,8 @@ public void networkFailureOnWrite() throws Exception {
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = new SocketException("Dummy SocketException");;
CosmosException cosmosException = BridgeInternal.createCosmosException(null, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, exception);
@@ -372,7 +388,8 @@ public void tcpNetworkFailureOnWrite(
retryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
//Non retribale exception for write
GoneException goneException = new GoneException(exception);
@@ -441,7 +458,8 @@ public void networkFailureOnUpsert() throws Exception {
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = new SocketException("Dummy SocketException");
CosmosException cosmosException = BridgeInternal.createCosmosException(null, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, exception);
@@ -485,7 +503,8 @@ public void tcpNetworkFailureOnUpsert() throws Exception {
retryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = new SocketException("Dummy SocketException");
GoneException goneException = new GoneException(exception);
@@ -530,7 +549,8 @@ public void networkFailureOnDelete() throws Exception {
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = ReadTimeoutException.INSTANCE;
CosmosException cosmosException = BridgeInternal.createCosmosException(
@@ -575,7 +595,8 @@ public void tcpNetworkFailureOnDelete() throws Exception {
retryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = ReadTimeoutException.INSTANCE;
GoneException goneException = new GoneException(exception);
@@ -619,7 +640,8 @@ public void onBeforeSendRequestNotInvoked() {
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
Exception exception = ReadTimeoutException.INSTANCE;
@@ -660,7 +682,8 @@ public void returnWithInternalServerErrorOnPpcbFailure(CosmosException cosmosExc
throttlingRetryOptions,
null,
globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
- globalPartitionEndpointManagerForPerPartitionAutomaticFailover);
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ false);
RxDocumentServiceRequest dsr;
Mono shouldRetry;
@@ -677,6 +700,72 @@ public void returnWithInternalServerErrorOnPpcbFailure(CosmosException cosmosExc
.build());
}
+ @Test(groups = "unit", dataProvider = "requestRateTooLarge_batch_ArgProvider")
+ public void requestRateTooLarge_batch(
+ OperationType operationType,
+ ResourceType resourceType,
+ boolean disableRetryForThrottledBatchRequest) throws Exception {
+
+ ThrottlingRetryOptions throttlingRetryOptions =
+ new ThrottlingRetryOptions().setMaxRetryAttemptsOnThrottledRequests(1);
+
+ GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
+ GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker globalPartitionEndpointManagerForPerPartitionCircuitBreaker
+ = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.class);
+
+ GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover
+ = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class);
+
+ Mockito
+ .doReturn(new RegionalRoutingContext(new URI("http://localhost")))
+ .when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
+
+ Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
+ ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(
+ mockDiagnosticsClientContext(),
+ endpointManager,
+ true,
+ throttlingRetryOptions,
+ null,
+ globalPartitionEndpointManagerForPerPartitionCircuitBreaker,
+ globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
+ disableRetryForThrottledBatchRequest);
+
+ // Create throttling exception with retry delay
+ Map headers = new HashMap<>();
+ headers.put(
+ HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS,
+ "1000");
+ headers.put(WFConstants.BackendHeaders.SUB_STATUS,
+ Integer.toString(HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE));
+ RequestRateTooLargeException throttlingException = new RequestRateTooLargeException(null, 1, "1", headers);
+
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
+ operationType,
+ "/dbs/db/colls/col",
+ resourceType);
+ request.requestContext = new DocumentServiceRequestContext();
+ request.requestContext.routeToLocation(0, true);
+
+ clientRetryPolicy.onBeforeSendRequest(request);
+
+ Mono shouldRetry = clientRetryPolicy.shouldRetry(throttlingException);
+ if (operationType != OperationType.Batch || resourceType != ResourceType.Document) {
+ validateSuccess(shouldRetry, ShouldRetryValidator.builder()
+ .nullException()
+ .shouldRetry(true)
+ .build());
+ } else if (disableRetryForThrottledBatchRequest) {
+ validateSuccess(shouldRetry, ShouldRetryValidator.builder()
+ .shouldRetry(false)
+ .build());
+ } else {
+ validateSuccess(shouldRetry, ShouldRetryValidator.builder()
+ .shouldRetry(true)
+ .build());
+ }
+ }
+
public static void validateSuccess(Mono single,
ShouldRetryValidator validator) {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java
index e672381f247a..592b49dbe79b 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java
@@ -323,7 +323,8 @@ public void partitionedSessionToken(boolean isNameBased) throws NoSuchMethodExce
serverBatchRequest,
new RequestOptions(),
false,
- true)
+ true,
+ false)
.block();
assertThat(getSessionTokensInRequests().size()).isEqualTo(1);
assertThat(getSessionTokensInRequests().get(0)).isNotEmpty();
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
index 0b0346e9a2b7..c7ccbbcd3678 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
@@ -38,10 +38,12 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -292,4 +294,56 @@ public void executeBulk_complete() throws InterruptedException {
iterations++;
}
}
+
+ @Test(groups = { "emulator" }, timeOut = TIMEOUT)
+ public void executeBulk_tooManyRequest_recordInThresholds() throws Exception {
+ this.container = createContainer(database);
+
+ String pkValue = UUID.randomUUID().toString();
+ TestDoc testDoc = this.populateTestDoc(pkValue);
+ List cosmosItemOperations = new ArrayList<>();
+ cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(testDoc, new PartitionKey(pkValue)));
+
+ FaultInjectionRule tooManyRequestRule =
+ new FaultInjectionRuleBuilder("ttrs-" + UUID.randomUUID())
+ .condition(new FaultInjectionConditionBuilder().operationType(FaultInjectionOperationType.BATCH_ITEM).build())
+ .result(FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST).times(1).build())
+ .duration(Duration.ofSeconds(30))
+ .hitLimit(1)
+ .build();
+
+ CosmosBulkExecutionOptionsImpl cosmosBulkExecutionOptions = new CosmosBulkExecutionOptionsImpl();
+ final BulkExecutor