Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.43.0-beta.1 (Unreleased)

#### Features Added
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.43.0-beta.1 (Unreleased)

#### Features Added
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Used to influence the json serialization/deserialization behavior
| `spark.cosmos.throughputControl.targetThroughput` | None | Throughput control group target throughput |
| `spark.cosmos.throughputControl.targetThroughputThreshold` | None | Throughput control group target throughput threshold |
| `spark.cosmos.throughputControl.priorityLevel` | None | Throughput control group priority level. The priority level is used to determine which requests will be throttled first when the total throughput of all control groups exceeds the max throughput. Priority based execution is currently in preview. To enable the feature, please follow the instructions [here](https://devblogs.microsoft.com/cosmosdb/introducing-priority-based-execution-in-azure-cosmos-db-preview/#next-steps) |
| `spark.cosmos.throughputControl.throughputBucket` | None | Throughput bucket value. Please refer [here](https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet) for full context |
| `spark.cosmos.throughputControl.globalControl.database` | None | Database which will be used for throughput global control |
| `spark.cosmos.throughputControl.globalControl.container` | None | Container which will be used for throughput global control |
| `spark.cosmos.throughputControl.globalControl.renewIntervalInMS` | `5s` | How often the client is going to update the throughput usage of itself |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private[spark] object CosmosConfigNames {
val ThroughputControlTargetThroughput = "spark.cosmos.throughputControl.targetThroughput"
val ThroughputControlTargetThroughputThreshold = "spark.cosmos.throughputControl.targetThroughputThreshold"
val ThroughputControlPriorityLevel = "spark.cosmos.throughputControl.priorityLevel"
val ThroughputControlThroughputBucket = "spark.cosmos.throughputControl.throughputBucket"
val ThroughputControlGlobalControlDatabase = "spark.cosmos.throughputControl.globalControl.database"
val ThroughputControlGlobalControlContainer = "spark.cosmos.throughputControl.globalControl.container"
val ThroughputControlGlobalControlRenewalIntervalInMS =
Expand Down Expand Up @@ -278,6 +279,7 @@ private[spark] object CosmosConfigNames {
ThroughputControlTargetThroughput,
ThroughputControlTargetThroughputThreshold,
ThroughputControlPriorityLevel,
ThroughputControlThroughputBucket,
ThroughputControlGlobalControlDatabase,
ThroughputControlGlobalControlContainer,
ThroughputControlGlobalControlRenewalIntervalInMS,
Expand Down Expand Up @@ -2380,16 +2382,26 @@ private object CosmosChangeFeedConfig {
}
}

private case class CosmosThroughputControlConfig(cosmosAccountConfig: CosmosAccountConfig,
groupName: String,
targetThroughput: Option[Int],
targetThroughputThreshold: Option[Double],
priorityLevel: Option[PriorityLevel],
globalControlDatabase: Option[String],
globalControlContainer: Option[String],
globalControlRenewInterval: Option[Duration],
globalControlExpireInterval: Option[Duration],
globalControlUseDedicatedContainer: Boolean)
private trait CosmosThroughputControlConfig {
def groupName: String
}

private case class CosmosSDKThroughputControlConfig(
override val groupName: String,
cosmosAccountConfig: CosmosAccountConfig,
targetThroughput: Option[Int],
targetThroughputThreshold: Option[Double],
priorityLevel: Option[PriorityLevel],
globalControlDatabase: Option[String],
globalControlContainer: Option[String],
globalControlRenewInterval: Option[Duration],
globalControlExpireInterval: Option[Duration],
globalControlUseDedicatedContainer: Boolean) extends CosmosThroughputControlConfig

private case class CosmosServerThroughputControlConfig(
override val groupName: String,
throughputBucket: Int,
priorityLevel: Option[PriorityLevel]) extends CosmosThroughputControlConfig

private object CosmosThroughputControlConfig {
private val throughputControlEnabledSupplier = CosmosConfigEntry[Boolean](
Expand Down Expand Up @@ -2441,6 +2453,12 @@ private object CosmosThroughputControlConfig {
parseFromStringFunction = priorityLevel => CosmosConfigEntry.parseEnumeration(priorityLevel, PriorityLevels),
helpMessage = "Throughput control group priority level. The value can be High or Low. ")

private val throughputBucketSupplier = CosmosConfigEntry[Int](
key = CosmosConfigNames.ThroughputControlThroughputBucket,
mandatory = false,
parseFromStringFunction = throughputBucket => throughputBucket.toInt,
helpMessage = "Throughput bucket value. Please refer here for full context: https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet")

private val globalControlDatabaseSupplier = CosmosConfigEntry[String](
key = CosmosConfigNames.ThroughputControlGlobalControlDatabase,
mandatory = false,
Expand Down Expand Up @@ -2481,68 +2499,113 @@ private object CosmosThroughputControlConfig {
val throughputControlEnabled = CosmosConfigEntry.parse(cfg, throughputControlEnabledSupplier).get

if (throughputControlEnabled) {
// we will allow the customer to provide a different database account for throughput control
val throughputControlCosmosAccountConfig =
CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match {
case Some(_) => parseThroughputControlAccountConfig(cfg)
case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg)
}

val groupName = CosmosConfigEntry.parse(cfg, groupNameSupplier)
val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)
val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier)
val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier)
val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier)

if (groupName.isEmpty) {
throw new IllegalArgumentException(
s"Configuration option '${CosmosConfigNames.ThroughputControlName}' must not be empty.")
}
val throughputBucket = CosmosConfigEntry.parse(cfg, throughputBucketSupplier)

assert(groupName.isDefined, s"Parameter '${CosmosConfigNames.ThroughputControlName}' is missing.")

if (globalControlUseDedicatedContainer.isEmpty) {
throw new IllegalArgumentException(
s"Configuration option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' must not be empty.")
}
assert(
globalControlUseDedicatedContainer.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.")
if (throughputBucket.isDefined && throughputBucket.get > 0) {
Some(parseServerThroughputControlConfig(groupName.get, throughputBucket.get, cfg))
} else {

if (globalControlUseDedicatedContainer.get) {
if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) {
// If a non-positive throughputBucket was provided treat it as invalid
if (throughputBucket.isDefined && throughputBucket.get <= 0) {
throw new IllegalArgumentException(
s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " +
s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " +
s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.")
s"Mixed throughput control configuration detected: " +
s"Invalid '${CosmosConfigNames.ThroughputControlThroughputBucket}' value '${throughputBucket.get}'. It must be greater than 0 or omitted.")
}
assert(
globalControlDatabase.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.")
assert(
globalControlContainer.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.")
}

Some(CosmosThroughputControlConfig(
throughputControlCosmosAccountConfig,
groupName.get,
targetThroughput,
targetThroughputThreshold,
priorityLevel,
globalControlDatabase,
globalControlContainer,
globalControlItemRenewInterval,
globalControlItemExpireInterval,
globalControlUseDedicatedContainer.get))
// if throughput bucket is defined, then use server side throughput bucket control
// else validate SDK global throughput control config
Some(parseSDKThroughputControlConfig(groupName.get, cfg))
}
} else {
None
}
}

private[spark] def parseServerThroughputControlConfig(
groupName: String,
throughputBucket: Int,
cfg: Map[String, String]): CosmosServerThroughputControlConfig = {

// Detect presence of SDK/global throughput control options
val targetThroughputOpt = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
val targetThroughputThresholdOpt = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
val throughputControlAccountEndpointOpt = CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier)
val throughputControlAccountKeyOpt = CosmosConfigEntry.parse(cfg, throughputControlAccountKeySupplier)
val globalControlDatabaseOpt = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
val globalControlContainerOpt = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)

val sdkThroughputControlConfigsPresent = targetThroughputOpt.isDefined ||
targetThroughputThresholdOpt.isDefined ||
throughputControlAccountEndpointOpt.isDefined ||
throughputControlAccountKeyOpt.isDefined ||
globalControlDatabaseOpt.isDefined ||
globalControlContainerOpt.isDefined

if (sdkThroughputControlConfigsPresent) {
throw new IllegalArgumentException(
"Mixed throughput control configuration detected: 'throughputBucket' cannot be used together with " +
"['targetThroughput', 'targetThroughputThreshold', 'throughputControl.accountEndpoint', 'throughputControl.accountKey', " +
"'throughputControl.globalControl.database', 'throughputControl.globalControl.container']")
}

val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
CosmosServerThroughputControlConfig(groupName, throughputBucket, priorityLevel)
}

private[spark] def parseSDKThroughputControlConfig(
groupName: String,
cfg: Map[String, String]): CosmosSDKThroughputControlConfig = {
// we will allow the customer to provide a different database account for throughput control
val throughputControlCosmosAccountConfig =
CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match {
case Some(_) => parseThroughputControlAccountConfig(cfg)
case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg)
}

val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)
val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier)
val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier)
val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier)

assert(
globalControlUseDedicatedContainer.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.")

if (globalControlUseDedicatedContainer.get) {
if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) {
throw new IllegalArgumentException(
s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " +
s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " +
s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.")
}
assert(
globalControlDatabase.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.")
assert(
globalControlContainer.isDefined,
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.")
}

CosmosSDKThroughputControlConfig(
groupName,
throughputControlCosmosAccountConfig,
targetThroughput,
targetThroughputThreshold,
priorityLevel,
globalControlDatabase,
globalControlContainer,
globalControlItemRenewInterval,
globalControlItemExpireInterval,
globalControlUseDedicatedContainer.get)
}

private[spark] def parseThroughputControlAccountConfig(cfg: Map[String, String]): CosmosAccountConfig = {
val throughputControlAccountEndpoint = CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier)
val throughputControlAccountKey = CosmosConfigEntry.parse(cfg, throughputControlAccountKeySupplier)
Expand Down
Loading
Loading