Skip to content

Conversation

@xinlian12
Copy link
Member

@xinlian12 xinlian12 commented Jan 29, 2026

enableThroughputBucket support in spark

Test

image image

Copilot AI review requested due to automatic review settings January 29, 2026 19:44
@xinlian12 xinlian12 requested review from a team and kirankumarkolli as code owners January 29, 2026 19:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Spark connector support for Cosmos DB server-side throughput control via “throughput buckets”.

Changes:

  • Introduces spark.cosmos.throughputControl.throughputBucket config and parsing logic to distinguish SDK vs server throughput control.
  • Updates container initialization to enable enableServerThroughputControlGroup when throughput bucket config is provided.
  • Adds a unit test validating throughput bucket parsing.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala Adds config name + parsing, introduces separate SDK/server throughput control config types.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala Enables server throughput control group on the container when throughput bucket config is present.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala Adds a test ensuring throughput bucket is parsed into the server throughput control config.

Comment on lines 2449 to 2461
if (throughputControlEnabled) {
// we will allow the customer to provide a different database account for throughput control
val throughputControlCosmosAccountConfig =
CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match {
case Some(_) => parseThroughputControlAccountConfig(cfg)
case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg)
}

val groupName = CosmosConfigEntry.parse(cfg, groupNameSupplier)
val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)
val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier)
val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier)
val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier)

if (groupName.isEmpty) {
throw new IllegalArgumentException(
s"Configuration option '${CosmosConfigNames.ThroughputControlName}' must not be empty.")
}
val throughputBucket = CosmosConfigEntry.parse(cfg, throughputBucketSupplier)

assert(groupName.isDefined, s"Parameter '${CosmosConfigNames.ThroughputControlName}' is missing.")

if (globalControlUseDedicatedContainer.isEmpty) {
throw new IllegalArgumentException(
s"Configuration option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' must not be empty.")
}
assert(
globalControlUseDedicatedContainer.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.")

if (globalControlUseDedicatedContainer.get) {
if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) {
throw new IllegalArgumentException(
s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " +
s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " +
s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.")
}
assert(
globalControlDatabase.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.")
assert(
globalControlContainer.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.")
if (throughputBucket.isDefined) {
Some(parseServerThroughputControlConfig(groupName.get, throughputBucket.get, cfg))
} else {
// if throughput bucket is defined, then use server side throughput bucket control
// else valida SDK global throughput control config
Some(parseSDKThroughputControlConfig(groupName.get, cfg))
}
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When throughputBucket is present, parseThroughputControlConfig silently switches to server throughput control and ignores other throughput-control settings (for example targetThroughput, targetThroughputThreshold, and all globalControl.* options). This can lead to confusing misconfiguration where user-provided options are dropped without any feedback. Consider either (a) validating that no SDK-throughput-control-only settings are specified when throughputBucket is set and throwing a clear exception, or (b) explicitly supporting enabling both SDK and server throughput control groups if that’s intended.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

@xinlian12 xinlian12 Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, this is also one point not 100% sure, should we also added a config for explicit throughput control type selection, will discuss more with team. @FabianMeiswinkel what is your thoughts here

should we add another config 'throughputControl.type' -> and then the value can be SDK/Server

Comment on lines +70 to +84
private def enableServerThroughputControl(
container: CosmosAsyncContainer,
throughputControlConfig: CosmosServerThroughputControlConfig): Unit = {
val groupConfigBuilder =
new ThroughputControlGroupConfigBuilder()
.groupName(throughputControlConfig.groupName)

groupConfigBuilder.throughputBucket(throughputControlConfig.throughputBucket)

if (throughputControlConfig.priorityLevel.isDefined) {
groupConfigBuilder.priorityLevel(parsePriorityLevel(throughputControlConfig.priorityLevel.get))
}

container.enableServerThroughputControlGroup(groupConfigBuilder.build())
}
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New server-throughput-control path (enableServerThroughputControl / enableServerThroughputControlGroup) isn’t covered by tests. Since this changes runtime behavior (container initialization + throughput control wiring), please add an integration test alongside existing throughput control ITests (for example, in SparkE2EThroughputControlITest.scala) that configures spark.cosmos.throughputControl.throughputBucket and verifies the connector can perform an operation with server throughput control enabled.

Copilot generated this review using guidance from repository custom instructions.
@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant