Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.43.0-beta.1 (Unreleased)

#### Features Added
* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.43.0-beta.1 (Unreleased)

#### Features Added
* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)

#### Breaking Changes

Expand Down
28 changes: 15 additions & 13 deletions sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ private class BulkWriter
// so multiplying by cpuCount in the default config is too aggressive
private val maxPendingOperations = writeConfig.bulkMaxPendingOperations
.getOrElse(DefaultMaxPendingOperationPerCore)
private val maxConcurrentPartitions = writeConfig.maxConcurrentCosmosPartitions match {

private val bulkExecutionConfigs = writeConfig.bulkExecutionConfigs.get.asInstanceOf[CosmosWriteBulkExecutionConfigs]
private val maxConcurrentPartitions = bulkExecutionConfigs.maxConcurrentCosmosPartitions match {
// using the provided maximum of concurrent partitions per Spark partition on the input data
// multiplied by 2 to leave space for partition splits during ingestion
case Some(configuredMaxConcurrentPartitions) => 2 * configuredMaxConcurrentPartitions
Expand Down Expand Up @@ -146,20 +148,20 @@ private class BulkWriter

ThroughputControlHelper.populateThroughputControlGroupName(cosmosBulkExecutionOptions, writeConfig.throughputControlConfig)

writeConfig.maxMicroBatchPayloadSizeInBytes match {
bulkExecutionConfigs.maxMicroBatchPayloadSizeInBytes match {
case Some(customMaxMicroBatchPayloadSizeInBytes) =>
cosmosBulkExecutionOptionsImpl
.setMaxMicroBatchPayloadSizeInBytes(customMaxMicroBatchPayloadSizeInBytes)
case None =>
}

writeConfig.initialMicroBatchSize match {
bulkExecutionConfigs.initialMicroBatchSize match {
case Some(customInitialMicroBatchSize) =>
cosmosBulkExecutionOptions.setInitialMicroBatchSize(Math.max(1, customInitialMicroBatchSize))
case None =>
}

writeConfig.maxMicroBatchSize match {
bulkExecutionConfigs.maxMicroBatchSize match {
case Some(customMaxMicroBatchSize) =>
cosmosBulkExecutionOptions.setMaxMicroBatchSize(
Math.max(
Expand Down Expand Up @@ -267,7 +269,7 @@ private class BulkWriter

// We start from using the bulk batch size and interval and concurrency
// If in the future, there is a need to separate the configuration, can re-consider
val bulkBatchSize = writeConfig.maxMicroBatchSize match {
val bulkBatchSize = bulkExecutionConfigs.maxMicroBatchSize match {
case Some(customMaxMicroBatchSize) => Math.min(
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST,
Math.max(1, customMaxMicroBatchSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private[spark] object CosmosConfigNames {
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
val WriteBulkTransactionalMaxOperationsConcurrency = "spark.cosmos.write.bulk.transactional.maxOperationsConcurrency"
val WriteBulkTransactionalMaxBatchesConcurrency = "spark.cosmos.write.bulk.transactional.maxBatchesConcurrency"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
Expand Down Expand Up @@ -248,6 +250,8 @@ private[spark] object CosmosConfigNames {
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WriteBulkInitialBatchSize,
WriteBulkTransactionalMaxOperationsConcurrency,
WriteBulkTransactionalMaxBatchesConcurrency,
WriteBulkMaxBatchSize,
WriteBulkMinTargetBatchSize,
WritePointMaxConcurrency,
Expand Down Expand Up @@ -1464,21 +1468,31 @@ private case class CosmosPatchConfigs(columnConfigsMap: TrieMap[String, CosmosPa
private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
maxRetryCount: Int,
bulkEnabled: Boolean,
bulkTransactional: Boolean = false,
bulkTransactional: Boolean,
bulkExecutionConfigs: Option[CosmosWriteBulkExecutionConfigsBase] = None,
bulkMaxPendingOperations: Option[Int] = None,
pointMaxConcurrency: Option[Int] = None,
maxConcurrentCosmosPartitions: Option[Int] = None,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
initialMicroBatchSize: Option[Int] = None,
maxMicroBatchSize: Option[Int] = None,
minTargetMicroBatchSize: Option[Int] = None,
flushCloseIntervalInSeconds: Int = 60,
maxInitialNoProgressIntervalInSeconds: Int = 180,
maxRetryNoProgressIntervalInSeconds: Int = 45 * 60,
retryCommitInterceptor: Option[WriteOnRetryCommitInterceptor] = None)

private trait CosmosWriteBulkExecutionConfigsBase {}

private case class CosmosWriteBulkExecutionConfigs(
maxConcurrentCosmosPartitions: Option[Int] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
initialMicroBatchSize: Option[Int] = None,
maxMicroBatchSize: Option[Int] = None,
minTargetMicroBatchSize: Option[Int] = None) extends CosmosWriteBulkExecutionConfigsBase

private case class CosmosWriteTransactionalBulkExecutionConfigs(
maxConcurrentCosmosPartitions: Option[Int] = None,
maxConcurrentOperations: Option[Int] = None,
maxConcurrentBatches: Option[Int] = None) extends CosmosWriteBulkExecutionConfigsBase

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
private val DefaultPatchOperationType = CosmosPatchOperationTypes.Replace
Expand Down Expand Up @@ -1554,6 +1568,22 @@ private object CosmosWriteConfig {
s" repartitioned to balance to how many Cosmos partitions each Spark partition needs to write. This is mainly" +
s" useful for very large containers (with hundreds of physical partitions).")

private val bulkTransactionalMaxOpsConcurrency = CosmosConfigEntry[Int](
key = CosmosConfigNames.WriteBulkTransactionalMaxOperationsConcurrency,
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_BULK_TRANSACTIONAL_BATCH_OP_CONCURRENCY),
mandatory = false,
parseFromStringFunction = maxOpsConcurrency => maxOpsConcurrency.toInt,
helpMessage = "Max number of in-flight operations per Cosmos partition for transactional bulk mode. " +
"Higher values increase parallelism (and RU usage) but can cause throttling; default ~100.")

private val bulkTransactionalMaxBatchesConcurrency = CosmosConfigEntry[Int](
key = CosmosConfigNames.WriteBulkTransactionalMaxBatchesConcurrency,
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_BULK_TRANSACTIONAL_BATCH_CONCURRENCY),
mandatory = false,
parseFromStringFunction = maxBatchesConcurrency => maxBatchesConcurrency.toInt,
helpMessage = "Max concurrent transactional batches per Cosmos partition (1..5). Controls batch-level parallelism; default 5." +
"Each batch may contain multiple operations; tune together with 'spark.cosmos.write.bulk.transactional.maxOperationsConcurrency' to balance throughput and throttling.")

private val pointWriteConcurrency = CosmosConfigEntry[Int](key = CosmosConfigNames.WritePointMaxConcurrency,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -1772,10 +1802,7 @@ private object CosmosWriteConfig {
val bulkTransactionalOpt = CosmosConfigEntry.parse(cfg, bulkTransactional)
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)
val minTargetBatchSizeOpt = CosmosConfigEntry.parse(cfg, minTargetMicroBatchSize)

val writeRetryCommitInterceptor = CosmosConfigEntry
.parse(cfg, writeOnRetryCommitInterceptor).flatten

Expand All @@ -1785,12 +1812,6 @@ private object CosmosWriteConfig {
assert(itemWriteStrategyOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteStrategy}' is missing.")
assert(maxRetryCountOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteMaxRetryCount}' is missing.")

if (bulkTransactionalOpt.isDefined && bulkTransactionalOpt.get) {
// Validate write strategy for transactional batches
assert(itemWriteStrategyOpt.get == ItemWriteStrategy.ItemOverwrite,
s"Transactional batches only support ItemOverwrite (upsert) write strategy. Requested: ${itemWriteStrategyOpt.get}")
}

itemWriteStrategyOpt.get match {
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists =>
val patchColumnConfigMap = parsePatchColumnConfigs(cfg, inputSchema)
Expand All @@ -1802,20 +1823,51 @@ private object CosmosWriteConfig {
case _ =>
}

var bulkExecutionConfigsOpt: Option[CosmosWriteBulkExecutionConfigsBase] = None
if (bulkEnabledOpt.isDefined && bulkEnabledOpt.get) {

if (bulkTransactionalOpt.isDefined && bulkTransactionalOpt.get) {
// Validate write strategy for transactional batches
assert(itemWriteStrategyOpt.get == ItemWriteStrategy.ItemOverwrite,
s"Transactional batches only support ItemOverwrite (upsert) write strategy. Requested: ${itemWriteStrategyOpt.get}")

val maxConcurrentCosmosPartitionsOpt = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions)
val maxBulkTransactionalOpsConcurrencyOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMaxOpsConcurrency)
val maxBulkTransactionalBatchesConcurrencyOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMaxBatchesConcurrency)

bulkExecutionConfigsOpt = Some(CosmosWriteTransactionalBulkExecutionConfigs(
maxConcurrentCosmosPartitionsOpt,
maxBulkTransactionalOpsConcurrencyOpt,
maxBulkTransactionalBatchesConcurrencyOpt
))

} else {
// non-transactional batch
val maxConcurrentCosmosPartitionsOpt = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)
val minTargetBatchSizeOpt = CosmosConfigEntry.parse(cfg, minTargetMicroBatchSize)

bulkExecutionConfigsOpt = Some(CosmosWriteBulkExecutionConfigs(
maxConcurrentCosmosPartitionsOpt,
microBatchPayloadSizeInBytesOpt,
initialBatchSizeOpt,
maxBatchSizeOpt,
minTargetBatchSizeOpt))
}
}

CosmosWriteConfig(
itemWriteStrategyOpt.get,
maxRetryCountOpt.get,
bulkEnabled = bulkEnabledOpt.get,
bulkTransactional = bulkTransactionalOpt.get,
bulkExecutionConfigs = bulkExecutionConfigsOpt,
bulkMaxPendingOperations = CosmosConfigEntry.parse(cfg, bulkMaxPendingOperations),
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt,
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt,
initialMicroBatchSize = initialBatchSizeOpt,
maxMicroBatchSize = maxBatchSizeOpt,
minTargetMicroBatchSize = minTargetBatchSizeOpt,
flushCloseIntervalInSeconds = CosmosConfigEntry.parse(cfg, flushCloseIntervalInSeconds).get,
maxInitialNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxInitialNoProgressIntervalInSeconds).get,
maxRetryNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxRetryNoProgressIntervalInSeconds).get,
Expand Down
Loading
Loading