-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[SparkConnector]enableThroughputBucketSupport #47856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds Spark connector support for Cosmos DB server-side throughput control via “throughput buckets”.
Changes:
- Introduces
spark.cosmos.throughputControl.throughputBucketconfig and parsing logic to distinguish SDK vs server throughput control. - Updates container initialization to enable
enableServerThroughputControlGroupwhen throughput bucket config is provided. - Adds a unit test validating throughput bucket parsing.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala | Adds config name + parsing, introduces separate SDK/server throughput control config types. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala | Enables server throughput control group on the container when throughput bucket config is present. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala | Adds a test ensuring throughput bucket is parsed into the server throughput control config. |
| if (throughputControlEnabled) { | ||
| // we will allow the customer to provide a different database account for throughput control | ||
| val throughputControlCosmosAccountConfig = | ||
| CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match { | ||
| case Some(_) => parseThroughputControlAccountConfig(cfg) | ||
| case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg) | ||
| } | ||
|
|
||
| val groupName = CosmosConfigEntry.parse(cfg, groupNameSupplier) | ||
| val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier) | ||
| val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier) | ||
| val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier) | ||
| val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier) | ||
| val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier) | ||
| val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier) | ||
| val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier) | ||
| val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier) | ||
|
|
||
| if (groupName.isEmpty) { | ||
| throw new IllegalArgumentException( | ||
| s"Configuration option '${CosmosConfigNames.ThroughputControlName}' must not be empty.") | ||
| } | ||
| val throughputBucket = CosmosConfigEntry.parse(cfg, throughputBucketSupplier) | ||
|
|
||
| assert(groupName.isDefined, s"Parameter '${CosmosConfigNames.ThroughputControlName}' is missing.") | ||
|
|
||
| if (globalControlUseDedicatedContainer.isEmpty) { | ||
| throw new IllegalArgumentException( | ||
| s"Configuration option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' must not be empty.") | ||
| } | ||
| assert( | ||
| globalControlUseDedicatedContainer.isDefined, | ||
| s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.") | ||
|
|
||
| if (globalControlUseDedicatedContainer.get) { | ||
| if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) { | ||
| throw new IllegalArgumentException( | ||
| s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " + | ||
| s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " + | ||
| s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.") | ||
| } | ||
| assert( | ||
| globalControlDatabase.isDefined, | ||
| s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.") | ||
| assert( | ||
| globalControlContainer.isDefined, | ||
| s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.") | ||
| if (throughputBucket.isDefined) { | ||
| Some(parseServerThroughputControlConfig(groupName.get, throughputBucket.get, cfg)) | ||
| } else { | ||
| // if throughput bucket is defined, then use server side throughput bucket control | ||
| // else valida SDK global throughput control config | ||
| Some(parseSDKThroughputControlConfig(groupName.get, cfg)) | ||
| } |
Copilot
AI
Jan 29, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When throughputBucket is present, parseThroughputControlConfig silently switches to server throughput control and ignores other throughput-control settings (for example targetThroughput, targetThroughputThreshold, and all globalControl.* options). This can lead to confusing misconfiguration where user-provided options are dropped without any feedback. Consider either (a) validating that no SDK-throughput-control-only settings are specified when throughputBucket is set and throwing a clear exception, or (b) explicitly supporting enabling both SDK and server throughput control groups if that’s intended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, this is also one point not 100% sure, should we also added a config for explicit throughput control type selection, will discuss more with team. @FabianMeiswinkel what is your thoughts here
should we add another config 'throughputControl.type' -> and then the value can be SDK/Server
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
| private def enableServerThroughputControl( | ||
| container: CosmosAsyncContainer, | ||
| throughputControlConfig: CosmosServerThroughputControlConfig): Unit = { | ||
| val groupConfigBuilder = | ||
| new ThroughputControlGroupConfigBuilder() | ||
| .groupName(throughputControlConfig.groupName) | ||
|
|
||
| groupConfigBuilder.throughputBucket(throughputControlConfig.throughputBucket) | ||
|
|
||
| if (throughputControlConfig.priorityLevel.isDefined) { | ||
| groupConfigBuilder.priorityLevel(parsePriorityLevel(throughputControlConfig.priorityLevel.get)) | ||
| } | ||
|
|
||
| container.enableServerThroughputControlGroup(groupConfigBuilder.build()) | ||
| } |
Copilot
AI
Jan 29, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New server-throughput-control path (enableServerThroughputControl / enableServerThroughputControlGroup) isn’t covered by tests. Since this changes runtime behavior (container initialization + throughput control wiring), please add an integration test alongside existing throughput control ITests (for example, in SparkE2EThroughputControlITest.scala) that configures spark.cosmos.throughputControl.throughputBucket and verifies the connector can perform an operation with server throughput control enabled.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala
Show resolved
Hide resolved
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
enableThroughputBucket support in spark
Test