Skip to content

Conversation

@xinlian12
Copy link
Member

@xinlian12 xinlian12 commented Jan 25, 2026

Changes

Added concurrency control for transactional batch bulk executor.

High level overview

TransactionalBulkExecutor consumes a Flux and executes transactional (per-partition) batches with per-partition adaptive micro-batch sizing and concurrency control to maximize throughput while minimizing throttling.

  • Ingest: consume a stream of CosmosBatch objects and attach a transactional retry policy per batch.

  • Resolve & group: resolve partition key range id for each batch and group batches by partition-range so each partition is processed independently.

  • Partition state: maintain a PartitionScopeThresholds object per partition-range that tracks successes, enqueued retries and computes an adaptive targetMicroBatchSize.

  • Concurrency control:

    • Keep two simple counters per partition: totalOperationsInFlight and totalBatchesInFlight.
    • Before executing a batch, check canFlushCosmosBatch:
      • allow if batch.size + totalOperationsInFlight <= targetMicroBatchSize OR there are no batches in flight (ensures forward progress)
      • otherwise wait on the partition’s flushSignal stream (emitted on batch completion and by a periodic flush task)
    • When allowed, increment the in-flight counters and send the batch to container.executeCosmosBatch.
  • Success path: recordSuccessfulOperation() in thresholds for each completed operation and emit a flush signal so waiting batches can proceed.

  • Failure & retry paths:

    • Fast re-enqueue (mainSink): if shouldRetryInMainSink() returns true, re-emit the batch to the main sink for immediate retry (does not mark thresholds as enqueued-retry).
    • Group retry (groupSink): otherwise, ask the transactional retry policy; if shouldRetry with backOffTime, record enqueued retry in thresholds and re-enqueue into the partition’s group sink immediately or after backoff delay.
    • Non-retriable failures are returned to the caller.
  • Completion & safety: a scheduled flush task periodically emits flush signals to avoid deadlocks; mainSourceCompleted and totalCount track lifecycle and trigger graceful shutdown.

flowchart TD
  A[Input Flux of CosmosBatch]
  B[Resolve partitionRangeId]
  C[Group by partitionRangeId]

  subgraph PartitionGroup
    direction TB
    T[PartitionScopeThresholds]
    S1[totalOperationsInFlight]
    S2[totalBatchesInFlight]
    D[Decision: canFlush?]
    W[Wait on flushSignal]
    X[Execute: executeCosmosBatch]
    OK[Success -> record success -> emit flushSignal]
    ERR[Failure -> evaluate retry policy]
    M[MainSink re-enqueue]
    G[GroupSink enqueue -> record enqueued retry]
  end

  A --> B --> C
  C --> PartitionGroup
  PartitionGroup --> D
  D -->|true| X
  D -->|false| W
  X --> OK
  X -->|error| ERR
  ERR -->|shouldRetryInMainSink| M
  ERR -->|else if retriable| G
  ERR -->|non-retriable| R[Return failure to caller]

  OK --> Flush[scheduled flush task emits flushSignal]
  G --> PartitionGroup
  Flush --> W
  S1 & S2 -. influence .-> D
Loading

Test

image

Spark version 3.5
Scala version 2.12

Test with 100K RU (429 vs success ratio is about 3:1)

Run no Total no. recs Max recs per pk normal bulk (seconds) transactional bulk (seconds) Container RU
1 10,000,000 30 950.97 916.73 100,000
2 10,000,000 30 932.09 914.61 100,000
3 10,000,000 30 932.29 1200.78 100,000
4 10,000,000 30 930.37 915.81 100,000
5 10,000,000 30 925.18 916.35 100,000

Test with 10K RU to verify the job can still succeed (429 vs success ratio is about 10:1 for transactional batch)

Run no Total no. recs Max recs per pk normal bulk (seconds) transactional bulk (seconds) Container RU
1 1,000,000 30 1255.92 949.78 10,000
2 1,000,000 30 1240.96 949.42 10,000

Test with dynamic docs per partition with 100K RU

Run no Total no. recs Max recs per pk normal bulk (seconds) transactional bulk (seconds) Container RU
1 1,000,000 [2,5] 120.32 98.30 100,000
2 1,000,000 [2,5] 101.13 92.47 100,000
3 1,000,000 [2,5] 97.21 97.26 100,000
4 1,000,000 [2,5] 97.14 97.60 100,000
5 1,000,000 [2,5] 100.42 205.67 100,000

Test with 1M RU to test throughput with minimal throttling (no throttling for transactional batch)

Run no Total no. recs Max recs per pk normal bulk (seconds) transactional bulk (seconds) Container RU
1 10,000,000 [2,5] 107.90 120.06 1,000,000
2 10,000,000 [2,5] 103.59 126.83 1,000,000
3 10,000,000 [2,5] 99.86 104.77 1,000,000
4 10,000,000 [2,5] 101.99 104.19 1,000,000
5 10,000,000 [2,5] 106.01 104.28 1,000,000

Test with throughput control with 100K RU (target throughputThreshold=0.5)

image
Run no Total no. recs Max recs per pk transactional bulk (seconds) Container RU
1 10,000,000 [2,5] 2099.06 100,000
2 10,000,000 [2,5] 2217.60 100,000
3 10,000,000 [2,5] 2188.28 100,000
4 10,000,000 [2,5] 2190.47 100,000
5 10,000,000 [2,5] 2180.41 100,000

Test with 100KRU for spark 4.0

Run no Total no. recs Max recs per pk transactional bulk (seconds) Container RU
1 1,000,000 [2,5] 93.40 100,000
2 1,000,000 [2,5] 95.10 100,000
3 1,000,000 [2,5] 95.56 100,000
4 1,000,000 [2,5] 94.42 100,000
5 1,000,000 [2,5] 95.01 100,000

@xinlian12 xinlian12 changed the title [SparkConnector]AdjustConcurrencyControlForTransactionalBatch [SparkConnector][NO REVIEW]AdjustConcurrencyControlForTransactionalBatch Jan 25, 2026
@xinlian12 xinlian12 force-pushed the concurrencyControlForTransactionalBatch branch from 4c3ff6a to 88fe27c Compare January 25, 2026 21:10
@xinlian12 xinlian12 force-pushed the concurrencyControlForTransactionalBatch branch from 56c64a1 to 588f051 Compare January 26, 2026 22:55
@xinlian12 xinlian12 force-pushed the concurrencyControlForTransactionalBatch branch from 269a0f8 to f3b0b93 Compare January 27, 2026 18:25
@xinlian12 xinlian12 changed the title [SparkConnector][NO REVIEW]AdjustConcurrencyControlForTransactionalBatch [SparkConnector]AdjustConcurrencyControlForTransactionalBatch Jan 27, 2026
@xinlian12 xinlian12 marked this pull request as ready for review January 27, 2026 18:43
@xinlian12 xinlian12 requested review from a team and kirankumarkolli as code owners January 27, 2026 18:43
Copilot AI review requested due to automatic review settings January 27, 2026 18:43
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds per-partition concurrency control and retry handling improvements for transactional bulk execution (Spark connector), aiming to improve throughput while reducing throttling.

Changes:

  • Introduces per-partition concurrency gating and flush signaling in TransactionalBulkExecutor.
  • Adds a transactional-batch-specific retry policy and disables client-side 429 retry for batch requests (so the executor can manage retries).
  • Extends Spark write configuration to support transactional bulk execution concurrency settings, and updates related tests.

Reviewed changes

Copilot reviewed 36 out of 36 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBatchRequestOptions.java Adds internal flag to disable client-side retry for throttled batch requests and wires it through bridge accessors.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBatch.java Allows attaching a transactional retry policy to a batch via internal bridge accessor.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java Implements per-partition grouping, adaptive thresholds, and concurrency/flush-signal control for transactional batches.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBatchRetryPolicy.java New retry policy for transactional batches (partition splits/merges, stale resources, throttling).
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java Refactors threshold construction to support transactional executor parameterization.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/CosmosBulkTransactionalBatchResponse.java New wrapper to return either response or exception along with the originating CosmosBatch.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java Adds helpers for transactional batch retry header extraction and refactors PK range resolution.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java Updates batch request invocation to include the new “disable retry for throttled batch request” parameter.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstants.java Adds default constant for transactional-batch flush interval after draining.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecutor.java Plumbs new “disable retry for throttled batch request” parameter into batch execution.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java Passes the new batch-throttling retry-disable flag into request retry policy creation.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryPolicy.java Extends retry-policy factory to accept the new batch-throttling retry-disable flag.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceThrottleRetryPolicy.java Adds support to disable throttling retries specifically for document batch requests.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResetSessionTokenRetryPolicyFactory.java Extends factory method to accept and forward the new batch-throttling retry-disable flag.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java Extends bridge helpers for new batch options + cosmos batch retry policy plumbing.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicyFactory.java Adds overload to create request policies with the new disable-retry-for-batch-throttling flag.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosTransactionalBulkExecutionOptionsImpl.java Adds transactional bulk execution knobs (op/batch concurrency, retry-rate targets) used by Spark and executor.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java Adds a dedicated scheduler for transactional bulk executor flush signaling.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java Adds config/env overrides for transactional batch flush interval and concurrency defaults.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java Wires throttling retry policy’s onBeforeSendRequest to capture operation/resource type for batch retry-disable logic.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java Extends executeBatchRequest signature with disable-retry-for-throttled-batch-request flag.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java Plumbs batch request option into BatchExecutor to disable throttling retries when requested.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java Updates constructor call sites to include new ClientRetryPolicy parameter.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutorTest.java Adds emulator/fault-injection coverage for transactional bulk executor behavior and concurrency gating.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java Updates executeBatchRequest call to include new disable-retry-for-throttled-batch-request flag.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java Adds unit test coverage for “disable retry for throttled batch request” behavior.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/utils/CosmosPatchTestHelper.scala Updates writer construction to match new bulk config model (bulkExecutionConfigs + bulkTransactional).
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala Minor updates for transactional batch integration test compilation/usage.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2EBulkWriteITest.scala Expands e2e bulk write test to run with transactional bulk enabled/disabled.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterSubpartitionITest.scala Updates point writer configs to include new bulkTransactional/bulkExecutionConfigs parameters.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterITest.scala Updates point writer configs to include new bulkTransactional/bulkExecutionConfigs parameters.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala Adds parsing tests for transactional bulk write config and updates bulk config assertions.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala Updates bulk writer construction to the new bulk config model.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala Uses new transactional bulk configs and new executor response wrapper type.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala Refactors Spark write config model to separate bulk vs transactional bulk execution configs and parse new keys.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala Adapts bulk writer to read execution settings from the new bulkExecutionConfigs structure.

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - I love your PRs - the idea of reusing the dynamic tuning of batch size (now concurrency) is a great idea!

@xinlian12
Copy link
Member Author

/azp run java - cosmos - spark

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

@xinlian12
Copy link
Member Author

/azp run java - cosmos - spark

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

1 similar comment
@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12 xinlian12 force-pushed the concurrencyControlForTransactionalBatch branch from 5ef9083 to 40e90ff Compare January 29, 2026 05:47
@xinlian12
Copy link
Member Author

/azp run java - cosmos - spark

@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

1 similar comment
@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

spark 4.0 pipeline failed due to capacity issue - will follow up offline

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM still after most recent changes.

@xinlian12
Copy link
Member Author

/check-enforcer override

@xinlian12 xinlian12 merged commit ffc4ceb into Azure:main Jan 30, 2026
100 of 104 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants