-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[SparkConnector]AdjustConcurrencyControlForTransactionalBatch #47803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SparkConnector]AdjustConcurrencyControlForTransactionalBatch #47803
Conversation
4c3ff6a to
88fe27c
Compare
56c64a1 to
588f051
Compare
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
269a0f8 to
f3b0b93
Compare
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java
Outdated
Show resolved
Hide resolved
...re-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds per-partition concurrency control and retry handling improvements for transactional bulk execution (Spark connector), aiming to improve throughput while reducing throttling.
Changes:
- Introduces per-partition concurrency gating and flush signaling in
TransactionalBulkExecutor. - Adds a transactional-batch-specific retry policy and disables client-side 429 retry for batch requests (so the executor can manage retries).
- Extends Spark write configuration to support transactional bulk execution concurrency settings, and updates related tests.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBatchRequestOptions.java | Adds internal flag to disable client-side retry for throttled batch requests and wires it through bridge accessors. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBatch.java | Allows attaching a transactional retry policy to a batch via internal bridge accessor. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java | Implements per-partition grouping, adaptive thresholds, and concurrency/flush-signal control for transactional batches. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBatchRetryPolicy.java | New retry policy for transactional batches (partition splits/merges, stale resources, throttling). |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java | Refactors threshold construction to support transactional executor parameterization. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/CosmosBulkTransactionalBatchResponse.java | New wrapper to return either response or exception along with the originating CosmosBatch. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java | Adds helpers for transactional batch retry header extraction and refactors PK range resolution. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java | Updates batch request invocation to include the new “disable retry for throttled batch request” parameter. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstants.java | Adds default constant for transactional-batch flush interval after draining. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecutor.java | Plumbs new “disable retry for throttled batch request” parameter into batch execution. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java | Passes the new batch-throttling retry-disable flag into request retry policy creation. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryPolicy.java | Extends retry-policy factory to accept the new batch-throttling retry-disable flag. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceThrottleRetryPolicy.java | Adds support to disable throttling retries specifically for document batch requests. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResetSessionTokenRetryPolicyFactory.java | Extends factory method to accept and forward the new batch-throttling retry-disable flag. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java | Extends bridge helpers for new batch options + cosmos batch retry policy plumbing. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicyFactory.java | Adds overload to create request policies with the new disable-retry-for-batch-throttling flag. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosTransactionalBulkExecutionOptionsImpl.java | Adds transactional bulk execution knobs (op/batch concurrency, retry-rate targets) used by Spark and executor. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java | Adds a dedicated scheduler for transactional bulk executor flush signaling. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java | Adds config/env overrides for transactional batch flush interval and concurrency defaults. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java | Wires throttling retry policy’s onBeforeSendRequest to capture operation/resource type for batch retry-disable logic. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java | Extends executeBatchRequest signature with disable-retry-for-throttled-batch-request flag. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java | Plumbs batch request option into BatchExecutor to disable throttling retries when requested. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java | Updates constructor call sites to include new ClientRetryPolicy parameter. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutorTest.java | Adds emulator/fault-injection coverage for transactional bulk executor behavior and concurrency gating. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java | Updates executeBatchRequest call to include new disable-retry-for-throttled-batch-request flag. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java | Adds unit test coverage for “disable retry for throttled batch request” behavior. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/utils/CosmosPatchTestHelper.scala | Updates writer construction to match new bulk config model (bulkExecutionConfigs + bulkTransactional). |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala | Minor updates for transactional batch integration test compilation/usage. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2EBulkWriteITest.scala | Expands e2e bulk write test to run with transactional bulk enabled/disabled. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterSubpartitionITest.scala | Updates point writer configs to include new bulkTransactional/bulkExecutionConfigs parameters. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/PointWriterITest.scala | Updates point writer configs to include new bulkTransactional/bulkExecutionConfigs parameters. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala | Adds parsing tests for transactional bulk write config and updates bulk config assertions. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala | Updates bulk writer construction to the new bulk config model. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala | Uses new transactional bulk configs and new executor response wrapper type. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala | Refactors Spark write config model to separate bulk vs transactional bulk execution configs and parse new keys. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala | Adapts bulk writer to read execution settings from the new bulkExecutionConfigs structure. |
...c/main/java/com/azure/cosmos/implementation/CosmosTransactionalBulkExecutionOptionsImpl.java
Show resolved
Hide resolved
...c/main/java/com/azure/cosmos/implementation/CosmosTransactionalBulkExecutionOptionsImpl.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/azure/cosmos/implementation/CosmosTransactionalBulkExecutionOptionsImpl.java
Show resolved
Hide resolved
...re-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java
Show resolved
Hide resolved
...re-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java
Show resolved
Hide resolved
...re-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java
Show resolved
Hide resolved
...osmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBatchRetryPolicy.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
FabianMeiswinkel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - I love your PRs - the idea of reusing the dynamic tuning of batch size (now concurrency) is a great idea!
|
/azp run java - cosmos - spark |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
/azp run java - cosmos - tests |
|
/azp run java - cosmos - spark |
|
Azure Pipelines successfully started running 1 pipeline(s). |
1 similar comment
|
Azure Pipelines successfully started running 1 pipeline(s). |
5ef9083 to
40e90ff
Compare
|
/azp run java - cosmos - spark |
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
1 similar comment
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
spark 4.0 pipeline failed due to capacity issue - will follow up offline |
...re-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java
Show resolved
Hide resolved
FabianMeiswinkel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM still after most recent changes.
|
/check-enforcer override |
Changes
Added concurrency control for transactional batch bulk executor.
High level overview
TransactionalBulkExecutor consumes a Flux and executes transactional (per-partition) batches with per-partition adaptive micro-batch sizing and concurrency control to maximize throughput while minimizing throttling.
Ingest: consume a stream of CosmosBatch objects and attach a transactional retry policy per batch.Resolve & group: resolve partition key range id for each batch and group batches by partition-range so each partition is processed independently.Partition state: maintain a PartitionScopeThresholds object per partition-range that tracks successes, enqueued retries and computes an adaptive targetMicroBatchSize.Concurrency control:Success path: recordSuccessfulOperation() in thresholds for each completed operation and emit a flush signal so waiting batches can proceed.Failure & retry paths:Completion & safety: a scheduled flush task periodically emits flush signals to avoid deadlocks; mainSourceCompleted and totalCount track lifecycle and trigger graceful shutdown.flowchart TD A[Input Flux of CosmosBatch] B[Resolve partitionRangeId] C[Group by partitionRangeId] subgraph PartitionGroup direction TB T[PartitionScopeThresholds] S1[totalOperationsInFlight] S2[totalBatchesInFlight] D[Decision: canFlush?] W[Wait on flushSignal] X[Execute: executeCosmosBatch] OK[Success -> record success -> emit flushSignal] ERR[Failure -> evaluate retry policy] M[MainSink re-enqueue] G[GroupSink enqueue -> record enqueued retry] end A --> B --> C C --> PartitionGroup PartitionGroup --> D D -->|true| X D -->|false| W X --> OK X -->|error| ERR ERR -->|shouldRetryInMainSink| M ERR -->|else if retriable| G ERR -->|non-retriable| R[Return failure to caller] OK --> Flush[scheduled flush task emits flushSignal] G --> PartitionGroup Flush --> W S1 & S2 -. influence .-> DTest
Spark version 3.5
Scala version 2.12
Test with 100K RU (429 vs success ratio is about 3:1)
Test with 10K RU to verify the job can still succeed (429 vs success ratio is about 10:1 for transactional batch)
Test with dynamic docs per partition with 100K RU
Test with 1M RU to test throughput with minimal throttling (no throttling for transactional batch)
Test with throughput control with 100K RU (target throughputThreshold=0.5)
Test with 100KRU for spark 4.0