diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index bdeb1180052b..30f54af762c8 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.43.0-beta.1 (Unreleased) #### Features Added +* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 240dfed15851..4017722762c9 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.43.0-beta.1 (Unreleased) #### Features Added +* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 4c06111e924c..6944ebc3e139 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803) +* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md index 9a06400585d0..89c0d54889ab 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803) +* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md index d13b5bada3b4..f5cfc904cb7c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md @@ -146,6 +146,7 @@ Used to influence the json serialization/deserialization behavior | `spark.cosmos.throughputControl.targetThroughput` | None | Throughput control group target throughput | | `spark.cosmos.throughputControl.targetThroughputThreshold` | None | Throughput control group target throughput threshold | | `spark.cosmos.throughputControl.priorityLevel` | None | Throughput control group priority level. The priority level is used to determine which requests will be throttled first when the total throughput of all control groups exceeds the max throughput. Priority based execution is currently in preview. To enable the feature, please follow the instructions [here](https://devblogs.microsoft.com/cosmosdb/introducing-priority-based-execution-in-azure-cosmos-db-preview/#next-steps) | +| `spark.cosmos.throughputControl.throughputBucket` | None | Throughput bucket value. Please refer [here](https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet) for full context | | `spark.cosmos.throughputControl.globalControl.database` | None | Database which will be used for throughput global control | | `spark.cosmos.throughputControl.globalControl.container` | None | Container which will be used for throughput global control | | `spark.cosmos.throughputControl.globalControl.renewIntervalInMS` | `5s` | How often the client is going to update the throughput usage of itself | diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index 1692d947f74a..968e88d3c95d 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -149,6 +149,7 @@ private[spark] object CosmosConfigNames { val ThroughputControlTargetThroughput = "spark.cosmos.throughputControl.targetThroughput" val ThroughputControlTargetThroughputThreshold = "spark.cosmos.throughputControl.targetThroughputThreshold" val ThroughputControlPriorityLevel = "spark.cosmos.throughputControl.priorityLevel" + val ThroughputControlThroughputBucket = "spark.cosmos.throughputControl.throughputBucket" val ThroughputControlGlobalControlDatabase = "spark.cosmos.throughputControl.globalControl.database" val ThroughputControlGlobalControlContainer = "spark.cosmos.throughputControl.globalControl.container" val ThroughputControlGlobalControlRenewalIntervalInMS = @@ -278,6 +279,7 @@ private[spark] object CosmosConfigNames { ThroughputControlTargetThroughput, ThroughputControlTargetThroughputThreshold, ThroughputControlPriorityLevel, + ThroughputControlThroughputBucket, ThroughputControlGlobalControlDatabase, ThroughputControlGlobalControlContainer, ThroughputControlGlobalControlRenewalIntervalInMS, @@ -2380,16 +2382,26 @@ private object CosmosChangeFeedConfig { } } -private case class CosmosThroughputControlConfig(cosmosAccountConfig: CosmosAccountConfig, - groupName: String, - targetThroughput: Option[Int], - targetThroughputThreshold: Option[Double], - priorityLevel: Option[PriorityLevel], - globalControlDatabase: Option[String], - globalControlContainer: Option[String], - globalControlRenewInterval: Option[Duration], - globalControlExpireInterval: Option[Duration], - globalControlUseDedicatedContainer: Boolean) +private trait CosmosThroughputControlConfig { + def groupName: String +} + +private case class CosmosSDKThroughputControlConfig( + override val groupName: String, + cosmosAccountConfig: CosmosAccountConfig, + targetThroughput: Option[Int], + targetThroughputThreshold: Option[Double], + priorityLevel: Option[PriorityLevel], + globalControlDatabase: Option[String], + globalControlContainer: Option[String], + globalControlRenewInterval: Option[Duration], + globalControlExpireInterval: Option[Duration], + globalControlUseDedicatedContainer: Boolean) extends CosmosThroughputControlConfig + +private case class CosmosServerThroughputControlConfig( + override val groupName: String, + throughputBucket: Int, + priorityLevel: Option[PriorityLevel]) extends CosmosThroughputControlConfig private object CosmosThroughputControlConfig { private val throughputControlEnabledSupplier = CosmosConfigEntry[Boolean]( @@ -2441,6 +2453,12 @@ private object CosmosThroughputControlConfig { parseFromStringFunction = priorityLevel => CosmosConfigEntry.parseEnumeration(priorityLevel, PriorityLevels), helpMessage = "Throughput control group priority level. The value can be High or Low. ") + private val throughputBucketSupplier = CosmosConfigEntry[Int]( + key = CosmosConfigNames.ThroughputControlThroughputBucket, + mandatory = false, + parseFromStringFunction = throughputBucket => throughputBucket.toInt, + helpMessage = "Throughput bucket value. Please refer here for full context: https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet") + private val globalControlDatabaseSupplier = CosmosConfigEntry[String]( key = CosmosConfigNames.ThroughputControlGlobalControlDatabase, mandatory = false, @@ -2481,68 +2499,113 @@ private object CosmosThroughputControlConfig { val throughputControlEnabled = CosmosConfigEntry.parse(cfg, throughputControlEnabledSupplier).get if (throughputControlEnabled) { - // we will allow the customer to provide a different database account for throughput control - val throughputControlCosmosAccountConfig = - CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match { - case Some(_) => parseThroughputControlAccountConfig(cfg) - case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg) - } - val groupName = CosmosConfigEntry.parse(cfg, groupNameSupplier) - val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier) - val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier) - val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier) - val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier) - val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier) - val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier) - val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier) - val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier) - - if (groupName.isEmpty) { - throw new IllegalArgumentException( - s"Configuration option '${CosmosConfigNames.ThroughputControlName}' must not be empty.") - } + val throughputBucket = CosmosConfigEntry.parse(cfg, throughputBucketSupplier) + assert(groupName.isDefined, s"Parameter '${CosmosConfigNames.ThroughputControlName}' is missing.") - if (globalControlUseDedicatedContainer.isEmpty) { - throw new IllegalArgumentException( - s"Configuration option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' must not be empty.") - } - assert( - globalControlUseDedicatedContainer.isDefined, - s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.") + if (throughputBucket.isDefined && throughputBucket.get > 0) { + Some(parseServerThroughputControlConfig(groupName.get, throughputBucket.get, cfg)) + } else { - if (globalControlUseDedicatedContainer.get) { - if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) { + // If a non-positive throughputBucket was provided treat it as invalid + if (throughputBucket.isDefined && throughputBucket.get <= 0) { throw new IllegalArgumentException( - s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " + - s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " + - s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.") + s"Mixed throughput control configuration detected: " + + s"Invalid '${CosmosConfigNames.ThroughputControlThroughputBucket}' value '${throughputBucket.get}'. It must be greater than 0 or omitted.") } - assert( - globalControlDatabase.isDefined, - s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.") - assert( - globalControlContainer.isDefined, - s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.") - } - Some(CosmosThroughputControlConfig( - throughputControlCosmosAccountConfig, - groupName.get, - targetThroughput, - targetThroughputThreshold, - priorityLevel, - globalControlDatabase, - globalControlContainer, - globalControlItemRenewInterval, - globalControlItemExpireInterval, - globalControlUseDedicatedContainer.get)) + // if throughput bucket is defined, then use server side throughput bucket control + // else validate SDK global throughput control config + Some(parseSDKThroughputControlConfig(groupName.get, cfg)) + } } else { None } } + private[spark] def parseServerThroughputControlConfig( + groupName: String, + throughputBucket: Int, + cfg: Map[String, String]): CosmosServerThroughputControlConfig = { + + // Detect presence of SDK/global throughput control options + val targetThroughputOpt = CosmosConfigEntry.parse(cfg, targetThroughputSupplier) + val targetThroughputThresholdOpt = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier) + val throughputControlAccountEndpointOpt = CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) + val throughputControlAccountKeyOpt = CosmosConfigEntry.parse(cfg, throughputControlAccountKeySupplier) + val globalControlDatabaseOpt = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier) + val globalControlContainerOpt = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier) + + val sdkThroughputControlConfigsPresent = targetThroughputOpt.isDefined || + targetThroughputThresholdOpt.isDefined || + throughputControlAccountEndpointOpt.isDefined || + throughputControlAccountKeyOpt.isDefined || + globalControlDatabaseOpt.isDefined || + globalControlContainerOpt.isDefined + + if (sdkThroughputControlConfigsPresent) { + throw new IllegalArgumentException( + "Mixed throughput control configuration detected: 'throughputBucket' cannot be used together with " + + "['targetThroughput', 'targetThroughputThreshold', 'throughputControl.accountEndpoint', 'throughputControl.accountKey', " + + "'throughputControl.globalControl.database', 'throughputControl.globalControl.container']") + } + + val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier) + CosmosServerThroughputControlConfig(groupName, throughputBucket, priorityLevel) + } + + private[spark] def parseSDKThroughputControlConfig( + groupName: String, + cfg: Map[String, String]): CosmosSDKThroughputControlConfig = { + // we will allow the customer to provide a different database account for throughput control + val throughputControlCosmosAccountConfig = + CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match { + case Some(_) => parseThroughputControlAccountConfig(cfg) + case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg) + } + + val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier) + val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier) + val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier) + val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier) + val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier) + val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier) + val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier) + val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier) + + assert( + globalControlUseDedicatedContainer.isDefined, + s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.") + + if (globalControlUseDedicatedContainer.get) { + if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) { + throw new IllegalArgumentException( + s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " + + s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " + + s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.") + } + assert( + globalControlDatabase.isDefined, + s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.") + assert( + globalControlContainer.isDefined, + s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.") + } + + CosmosSDKThroughputControlConfig( + groupName, + throughputControlCosmosAccountConfig, + targetThroughput, + targetThroughputThreshold, + priorityLevel, + globalControlDatabase, + globalControlContainer, + globalControlItemRenewInterval, + globalControlItemExpireInterval, + globalControlUseDedicatedContainer.get) + } + private[spark] def parseThroughputControlAccountConfig(cfg: Map[String, String]): CosmosAccountConfig = { val throughputControlAccountEndpoint = CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) val throughputControlAccountKey = CosmosConfigEntry.parse(cfg, throughputControlAccountKeySupplier) diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala index c7dc31fe224f..2b69b00ca312 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala @@ -21,33 +21,68 @@ private object ThroughputControlHelper extends BasicLoggingTrait { val container = cacheItem.cosmosClient.getDatabase(cosmosContainerConfig.database).getContainer(cosmosContainerConfig.container) if (throughputControlConfigOpt.isDefined) { - val throughputControlConfig = throughputControlConfigOpt.get - - if (throughputControlConfig.globalControlUseDedicatedContainer) { - assert(throughputControlCacheItemOpt.isDefined) - val throughputControlCacheItem = throughputControlCacheItemOpt.get - - this.enableGlobalThroughputControlGroup( - userConfig, - cosmosContainerConfig, - container, - cacheItem, - throughputControlCacheItem, - throughputControlConfig) - } else { - this.enableLocalThroughputControlGroup( - userConfig, - cosmosContainerConfig, - container, - cacheItem, - throughputControlConfig - ) - } + throughputControlConfigOpt.get match { + case cosmosSDKThroughputControlConfig: CosmosSDKThroughputControlConfig => enableSDKThroughputControl( + userConfig, + cosmosContainerConfig, + cacheItem, + container, + cosmosSDKThroughputControlConfig, + throughputControlCacheItemOpt + ) + case serverThroughputControlConfig: CosmosServerThroughputControlConfig => enableServerThroughputControl(container, serverThroughputControlConfig) + case otherConfig => throw new IllegalStateException(s"Throughput control config type ${otherConfig.getClass} is not supported") + } } container } + private def enableSDKThroughputControl( + userConfig: Map[String, String], + cosmosContainerConfig: CosmosContainerConfig, + cacheItem: CosmosClientCacheItem, + container: CosmosAsyncContainer, + throughputControlConfig: CosmosSDKThroughputControlConfig, + throughputControlCacheItemOpt: Option[CosmosClientCacheItem]): Unit = { + if (throughputControlConfig.globalControlUseDedicatedContainer) { + assert(throughputControlCacheItemOpt.isDefined) + val throughputControlCacheItem = throughputControlCacheItemOpt.get + + this.enableGlobalThroughputControlGroup( + userConfig, + cosmosContainerConfig, + container, + cacheItem, + throughputControlCacheItem, + throughputControlConfig) + } else { + this.enableLocalThroughputControlGroup( + userConfig, + cosmosContainerConfig, + container, + cacheItem, + throughputControlConfig + ) + } + } + + private def enableServerThroughputControl( + container: CosmosAsyncContainer, + throughputControlConfig: CosmosServerThroughputControlConfig): Unit = { + val groupConfigBuilder = + new ThroughputControlGroupConfigBuilder() + .groupName(throughputControlConfig.groupName) + + groupConfigBuilder.throughputBucket(throughputControlConfig.throughputBucket) + + if (throughputControlConfig.priorityLevel.isDefined) { + groupConfigBuilder.priorityLevel(parsePriorityLevel(throughputControlConfig.priorityLevel.get)) + } + + container.enableServerThroughputControlGroup(groupConfigBuilder.build()) + } + def populateThroughputControlGroupName( transactionalBulkExecutionOptions: CosmosTransactionalBulkExecutionOptionsImpl, throughputControlConfigOpt: Option[CosmosThroughputControlConfig] @@ -99,7 +134,7 @@ private object ThroughputControlHelper extends BasicLoggingTrait { container: CosmosAsyncContainer, cacheItem: CosmosClientCacheItem, throughputControlCacheItem: CosmosClientCacheItem, - throughputControlConfig: CosmosThroughputControlConfig): Unit = { + throughputControlConfig: CosmosSDKThroughputControlConfig): Unit = { val groupConfigBuilder = new ThroughputControlGroupConfigBuilder() .groupName(throughputControlConfig.groupName) @@ -149,7 +184,7 @@ private object ThroughputControlHelper extends BasicLoggingTrait { cosmosContainerConfig: CosmosContainerConfig, container: CosmosAsyncContainer, cacheItem: CosmosClientCacheItem, - throughputControlConfig: CosmosThroughputControlConfig): Unit = { + throughputControlConfig: CosmosSDKThroughputControlConfig): Unit = { val groupConfigBuilder = new ThroughputControlGroupConfigBuilder() .groupName(throughputControlConfig.groupName) @@ -203,22 +238,28 @@ private object ThroughputControlHelper extends BasicLoggingTrait { val diagnosticConfig = DiagnosticsConfig.parseDiagnosticsConfig(userConfig) if (throughputControlConfigOpt.isDefined) { - val throughputControlClientConfig = + throughputControlConfigOpt.get match { + case sdkThroughputControlConfig: CosmosSDKThroughputControlConfig => { + val throughputControlClientConfig = CosmosClientConfiguration.apply( - throughputControlConfigOpt.get.cosmosAccountConfig, + sdkThroughputControlConfig.cosmosAccountConfig, diagnosticConfig, ReadConsistencyStrategy.DEFAULT, sparkEnvironmentInfo) - val throughputControlClientMetadata = + val throughputControlClientMetadata = cosmosClientStateHandles match { - case None => None - case Some(_) => cosmosClientStateHandles.get.value.throughputControlClientMetadataCaches + case None => None + case Some(_) => cosmosClientStateHandles.get.value.throughputControlClientMetadataCaches } - Some(CosmosClientCache.apply( + Some(CosmosClientCache.apply( throughputControlClientConfig, throughputControlClientMetadata, s"ThroughputControl: $calledFrom")) + } + case _ => None + } + } else { None } diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala index 0b580fd7e367..17f75e45a746 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala @@ -1572,6 +1572,43 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait { throughputEndpointConfig.userAgentFormat shouldEqual UserAgentFormat.NoSparkEnv } + "CosmosThroughputControlConfig" should "parse throughput bucket" in { + val userConfig = Map( + "spark.cosmos.accountEndpoint" -> "https://boson-test.documents.azure.com:443/", + "spark.cosmos.accountKey" -> "xyz", + "spark.cosmos.throughputControl.throughputBucket" -> "2", + "spark.cosmos.throughputControl.enabled" -> "true", + "spark.cosmos.throughputControl.name" -> "test" + ) + + val throughputControlConfig = CosmosThroughputControlConfig.parseThroughputControlConfig(userConfig) + throughputControlConfig.isDefined shouldEqual true + throughputControlConfig.get match { + case serverThroughputControlConfig: CosmosServerThroughputControlConfig => serverThroughputControlConfig.throughputBucket shouldEqual 2 + case other => fail(s"should get CosmosServerThroughputControlConfig config, but get ${other.getClass} config type") + } + } + + it should "complain when both server and sdk throughput control configs are provided" in { + val userConfig = Map( + "spark.cosmos.accountEndpoint" -> "https://boson-test.documents.azure.com:443/", + "spark.cosmos.accountKey" -> "xyz", + "spark.cosmos.throughputControl.throughputBucket" -> "2", + "spark.cosmos.throughputControl.enabled" -> "true", + "spark.cosmos.throughputControl.name" -> "test", + "spark.cosmos.throughputControl.targetThroughput" -> "10" + ) + + try { + CosmosThroughputControlConfig.parseThroughputControlConfig(userConfig) + fail("should have failed due to mixed throughput control configuration") + } catch { + case e: IllegalArgumentException => + e.getMessage should startWith("Mixed throughput control configuration detected") + case e: Exception => fail(s"Unexpected exception type: ${e.getClass}") + } + } + private case class PatchColumnConfigParameterTest ( isValid: Boolean, diff --git a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md index 4c3391dcb35d..a36c21045e74 100644 --- a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md @@ -5,6 +5,7 @@ #### Features Added * Initial release of Spark 4.0 connector with Scala 2.13 support * Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803) +* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java index 34efb4a76d2e..f73d37b1d5ae 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java @@ -965,6 +965,29 @@ public void throughputControlDefaultGroup_LocalAndServer_requestOptions() { .hasMessage("A default group already exists"); } + @Test(groups = {"long-emulator"}, timeOut = TIMEOUT) + public void throughputControl_server_enableMultipleTimes() { + this.ensureContainer(); + + // This test is verify that same throughput control group can be enabled multiple times + + ThroughputControlGroupConfig serverGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("group-server" + UUID.randomUUID()) + .throughputBucket(3) + .build(); + container.enableServerThroughputControlGroup(serverGroupConfig); + + CosmosAsyncContainer sameContainer = + client.getDatabase(container.getDatabase().getId()).getContainer(container.getId()); + ThroughputControlGroupConfig sameServerGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName(serverGroupConfig.getGroupName()) + .throughputBucket(serverGroupConfig.getThroughputBucket()) + .build(); + sameContainer.enableServerThroughputControlGroup(sameServerGroupConfig); + } + @Test(groups = {"long-emulator"}, timeOut = TIMEOUT) public void throughputControl_noThroughputControlGroupEnabled() { this.ensureContainer(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/server/config/ServerThroughputControlGroup.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/server/config/ServerThroughputControlGroup.java index ee317cb9a096..51e96d61d558 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/server/config/ServerThroughputControlGroup.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/server/config/ServerThroughputControlGroup.java @@ -19,6 +19,7 @@ public class ServerThroughputControlGroup implements IThroughputControlGroup { private final CosmosAsyncContainer targetContainer; private final PriorityLevel priorityLevel; private final Integer throughputBucket; + private final String id; public ServerThroughputControlGroup( String groupName, @@ -39,6 +40,11 @@ public ServerThroughputControlGroup( this.targetContainer = targetContainer; this.priorityLevel = priorityLevel; this.throughputBucket = throughputBucket; + this.id = String.format( + "%s/%s/%s", + this.targetContainer.getDatabase().getId(), + this.targetContainer.getId(), + this.groupName); } public String getGroupName() { @@ -66,8 +72,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; ServerThroughputControlGroup that = (ServerThroughputControlGroup) o; return isDefault == that.isDefault - && Objects.equals(groupName, that.groupName) - && Objects.equals(targetContainer, that.targetContainer) + && Objects.equals(id, that.id) && Objects.equals(priorityLevel, that.priorityLevel) && Objects.equals(throughputBucket, that.throughputBucket); }