Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ prometheus = ["prometheus/push", "prometheus/process"]
# Enable integration tests with a running TiKV and PD instance.
# Use $PD_ADDRS, comma separated, to set the addresses the tests use.
integration-tests = []
apiv2-no-prefix = []

[lib]
name = "tikv_client"
Expand Down Expand Up @@ -58,6 +59,7 @@ rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
tempfile = "3.6"
toml = "0.8"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }

[[test]]
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ export RUSTFLAGS=-Dwarnings
export PD_ADDRS ?= 127.0.0.1:2379
export MULTI_REGION ?= 1

ALL_FEATURES := integration-tests
ALL_FEATURES := integration-tests apiv2-no-prefix

NEXTEST_ARGS := --config-file $(shell pwd)/config/nextest.toml

INTEGRATION_TEST_ARGS := --features "integration-tests" --test-threads 1
INTEGRATION_TEST_ARGS := --features "integration-tests apiv2-no-prefix" --test-threads 1

RUN_INTEGRATION_TEST := cargo nextest run ${NEXTEST_ARGS} --all ${INTEGRATION_TEST_ARGS}

Expand Down
243 changes: 240 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,138 @@
use std::path::PathBuf;
use std::time::Duration;

use serde::de;
use serde::ser;
use serde_derive::Deserialize;
use serde_derive::Serialize;

/// Config-time keyspace selection.
///
/// This is resolved into a request-time [`crate::request::Keyspace`] when creating a client.
#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub enum Keyspace {
/// Use API V1 and do not add/remove any API V2 keyspace prefix.
#[default]
Disable,
/// Use API V2 and let the client add/remove API V2 keyspace/key-mode prefixes.
///
/// The keyspace ID is resolved via PD using the provided keyspace name.
Enable { name: String },
/// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix.
///
/// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in
/// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed
/// through unchanged.
#[cfg(feature = "apiv2-no-prefix")]
ApiV2NoPrefix,
}

impl Keyspace {
fn is_disable(&self) -> bool {
matches!(self, Keyspace::Disable)
}
}

// Why custom serde (but still lightweight)?
//
// `Config.keyspace` used to be `Option<String>`, and existing config files commonly encoded it as:
// - missing: disable keyspace (API V1)
// - string: keyspace name (resolved via PD; API V2)
//
// `Keyspace::ApiV2NoPrefix` introduces a third mode that can't be represented as a string, but we
// must keep the legacy formats working. We implement `Serialize`/`Deserialize` using
// `#[serde(untagged)]` helper enums so we can accept multiple shapes (string / map / unit) with
// minimal boilerplate, while keeping serialization legacy-friendly (string for named keyspaces and
// omitting disabled keyspace in `Config` via `skip_serializing_if`).

#[derive(Deserialize)]
#[serde(untagged)]
enum KeyspaceDe {
Unit(()),
Name(String),
Map(KeyspaceDeMap),
}

#[derive(Deserialize)]
struct KeyspaceDeMap {
mode: Option<String>,
name: Option<String>,
}

#[derive(Serialize)]
#[serde(untagged)]
enum KeyspaceSer<'a> {
Unit(()),
Name(&'a str),
#[cfg(feature = "apiv2-no-prefix")]
Map(KeyspaceSerMap<'a>),
}

#[derive(Serialize)]
#[cfg(feature = "apiv2-no-prefix")]
struct KeyspaceSerMap<'a> {
mode: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<&'a str>,
}

impl ser::Serialize for Keyspace {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
let repr = match self {
// Prefer omitting the field at the `Config` layer; if serialized standalone, this is a
// serde "unit" value (e.g. `null` in JSON), matching the legacy `Option<String>::None`
// meaning "no keyspace configured".
Keyspace::Disable => KeyspaceSer::Unit(()),
Keyspace::Enable { name } => KeyspaceSer::Name(name),
#[cfg(feature = "apiv2-no-prefix")]
Keyspace::ApiV2NoPrefix => KeyspaceSer::Map(KeyspaceSerMap {
mode: "api-v2-no-prefix",
name: None,
}),
};
serde::Serialize::serialize(&repr, serializer)
}
}

impl<'de> de::Deserialize<'de> for Keyspace {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
let repr = <KeyspaceDe as serde::Deserialize>::deserialize(deserializer)?;
match repr {
KeyspaceDe::Unit(()) => Ok(Keyspace::Disable),
KeyspaceDe::Name(name) => Ok(Keyspace::Enable { name }),
KeyspaceDe::Map(KeyspaceDeMap { mode, name }) => match mode.as_deref() {
None => {
if let Some(name) = name {
Ok(Keyspace::Enable { name })
} else {
Ok(Keyspace::Disable)
}
}
Some("disable") => Ok(Keyspace::Disable),
Some("enable") => Ok(Keyspace::Enable {
name: name.ok_or_else(|| de::Error::missing_field("name"))?,
}),
#[cfg(feature = "apiv2-no-prefix")]
Some("api-v2-no-prefix" | "apiv2-no-prefix") => Ok(Keyspace::ApiV2NoPrefix),
#[cfg(not(feature = "apiv2-no-prefix"))]
Some("api-v2-no-prefix" | "apiv2-no-prefix") => Err(de::Error::custom(
"keyspace mode \"api-v2-no-prefix\" requires feature \"apiv2-no-prefix\"",
)),
Some(other) => Err(de::Error::unknown_variant(
other,
&["disable", "enable", "api-v2-no-prefix"],
)),
},
}
}
}

/// The configuration for either a [`RawClient`](crate::RawClient) or a
/// [`TransactionClient`](crate::TransactionClient).
///
Expand All @@ -19,7 +148,8 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub keyspace: Option<String>,
#[serde(default, skip_serializing_if = "Keyspace::is_disable")]
pub keyspace: Keyspace,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
Expand All @@ -31,7 +161,7 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
keyspace: None,
keyspace: Keyspace::default(),
}
}
}
Expand Down Expand Up @@ -99,7 +229,114 @@ impl Config {
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
self.keyspace = Some(keyspace.to_owned());
self.keyspace = Keyspace::Enable {
name: keyspace.to_owned(),
};
self
}

/// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix.
///
/// This is intended for **server-side embedding** use cases (e.g. embedding this client in
/// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed
/// through unchanged.
#[cfg(feature = "apiv2-no-prefix")]
#[must_use]
pub fn with_api_v2_no_prefix(mut self) -> Self {
self.keyspace = Keyspace::ApiV2NoPrefix;
self
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_config_toml_deserialize_cases() {
// Parse TOML into `Config` (covers `Keyspace` deserialization via the field).
let cases: Vec<(&str, Keyspace)> = vec![
("", Keyspace::Disable),
(
"keyspace = \"DEFAULT\"\n",
Keyspace::Enable {
name: "DEFAULT".to_owned(),
},
),
("keyspace = { mode = \"disable\" }\n", Keyspace::Disable),
(
"keyspace = { mode = \"enable\", name = \"DEFAULT\" }\n",
Keyspace::Enable {
name: "DEFAULT".to_owned(),
},
),
// Backward/forward compatibility: allow a structured keyspace table without `mode`.
(
"keyspace = { name = \"DEFAULT\" }\n",
Keyspace::Enable {
name: "DEFAULT".to_owned(),
},
),
// Empty table behaves like disabled.
("keyspace = {}\n", Keyspace::Disable),
];

#[cfg(feature = "apiv2-no-prefix")]
let cases = {
let mut cases = cases;
cases.push((
"keyspace = { mode = \"api-v2-no-prefix\" }\n",
Keyspace::ApiV2NoPrefix,
));
cases
};

for (input, expected_keyspace) in cases {
let cfg: Config = toml::from_str(input).unwrap();
assert_eq!(cfg.keyspace, expected_keyspace);
}

// Invalid structured forms.
let err = toml::from_str::<Config>("keyspace = { mode = \"enable\" }\n")
.unwrap_err()
.to_string();
assert!(
err.contains("missing field") || err.contains("missing_field"),
"unexpected error: {err}"
);

#[cfg(not(feature = "apiv2-no-prefix"))]
{
let err = toml::from_str::<Config>("keyspace = { mode = \"api-v2-no-prefix\" }\n")
.unwrap_err()
.to_string();
assert!(err.contains("requires feature"), "unexpected error: {err}");
}
}

#[test]
fn test_config_toml_serialize_cases() {
let cases: Vec<(Config, &str)> = vec![
(Config::default(), "[timeout]\nsecs = 2\nnanos = 0\n"),
(
Config::default().with_keyspace("DEFAULT"),
"keyspace = \"DEFAULT\"\n\n[timeout]\nsecs = 2\nnanos = 0\n",
),
];

#[cfg(feature = "apiv2-no-prefix")]
let cases = {
let mut cases = cases;
cases.push((
Config::default().with_api_v2_no_prefix(),
"[timeout]\nsecs = 2\nnanos = 0\n\n[keyspace]\nmode = \"api-v2-no-prefix\"\n",
));
cases
};

for (cfg, expected) in cases {
let out = toml::to_string(&cfg).unwrap();
assert_eq!(out, expected);
}
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub use common::Error;
pub use common::Result;
#[doc(inline)]
pub use config::Config;
#[doc(inline)]
pub use config::Keyspace;

#[doc(inline)]
pub use crate::backoff::Backoff;
Expand Down
16 changes: 12 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::time::sleep;
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::common::Error;
use crate::config::Config;
use crate::config::Keyspace as ConfigKeyspace;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
Expand Down Expand Up @@ -107,18 +108,25 @@ impl Client<PdRpcClient> {
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Self> {
let enable_codec = config.keyspace.is_some();
let enable_codec = match &config.keyspace {
ConfigKeyspace::Disable => false,
ConfigKeyspace::Enable { .. } => true,
#[cfg(feature = "apiv2-no-prefix")]
ConfigKeyspace::ApiV2NoPrefix => true,
};
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?);
let keyspace = match config.keyspace {
Some(keyspace) => {
let keyspace = rpc.load_keyspace(&keyspace).await?;
ConfigKeyspace::Disable => Keyspace::Disable,
ConfigKeyspace::Enable { name } => {
let keyspace = rpc.load_keyspace(&name).await?;
Keyspace::Enable {
keyspace_id: keyspace.id,
}
}
None => Keyspace::Disable,
#[cfg(feature = "apiv2-no-prefix")]
ConfigKeyspace::ApiV2NoPrefix => Keyspace::ApiV2NoPrefix,
};
Ok(Client {
rpc,
Expand Down
Loading