diff --git a/Cargo.toml b/Cargo.toml index 7e8e45f1..adca0537 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ prometheus = ["prometheus/push", "prometheus/process"] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] +apiv2-no-prefix = [] [lib] name = "tikv_client" @@ -58,6 +59,7 @@ rstest = "0.18.2" serde_json = "1" serial_test = "0.5.0" tempfile = "3.6" +toml = "0.8" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } [[test]] diff --git a/Makefile b/Makefile index ef3fd397..6e22ead3 100644 --- a/Makefile +++ b/Makefile @@ -5,11 +5,11 @@ export RUSTFLAGS=-Dwarnings export PD_ADDRS ?= 127.0.0.1:2379 export MULTI_REGION ?= 1 -ALL_FEATURES := integration-tests +ALL_FEATURES := integration-tests apiv2-no-prefix NEXTEST_ARGS := --config-file $(shell pwd)/config/nextest.toml -INTEGRATION_TEST_ARGS := --features "integration-tests" --test-threads 1 +INTEGRATION_TEST_ARGS := --features "integration-tests apiv2-no-prefix" --test-threads 1 RUN_INTEGRATION_TEST := cargo nextest run ${NEXTEST_ARGS} --all ${INTEGRATION_TEST_ARGS} diff --git a/src/config.rs b/src/config.rs index 79dad855..7bccc3a7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,9 +3,138 @@ use std::path::PathBuf; use std::time::Duration; +use serde::de; +use serde::ser; use serde_derive::Deserialize; use serde_derive::Serialize; +/// Config-time keyspace selection. +/// +/// This is resolved into a request-time [`crate::request::Keyspace`] when creating a client. +#[derive(Default, Clone, Debug, PartialEq, Eq)] +pub enum Keyspace { + /// Use API V1 and do not add/remove any API V2 keyspace prefix. + #[default] + Disable, + /// Use API V2 and let the client add/remove API V2 keyspace/key-mode prefixes. + /// + /// The keyspace ID is resolved via PD using the provided keyspace name. + Enable { name: String }, + /// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix. + /// + /// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in + /// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed + /// through unchanged. + #[cfg(feature = "apiv2-no-prefix")] + ApiV2NoPrefix, +} + +impl Keyspace { + fn is_disable(&self) -> bool { + matches!(self, Keyspace::Disable) + } +} + +// Why custom serde (but still lightweight)? +// +// `Config.keyspace` used to be `Option`, and existing config files commonly encoded it as: +// - missing: disable keyspace (API V1) +// - string: keyspace name (resolved via PD; API V2) +// +// `Keyspace::ApiV2NoPrefix` introduces a third mode that can't be represented as a string, but we +// must keep the legacy formats working. We implement `Serialize`/`Deserialize` using +// `#[serde(untagged)]` helper enums so we can accept multiple shapes (string / map / unit) with +// minimal boilerplate, while keeping serialization legacy-friendly (string for named keyspaces and +// omitting disabled keyspace in `Config` via `skip_serializing_if`). + +#[derive(Deserialize)] +#[serde(untagged)] +enum KeyspaceDe { + Unit(()), + Name(String), + Map(KeyspaceDeMap), +} + +#[derive(Deserialize)] +struct KeyspaceDeMap { + mode: Option, + name: Option, +} + +#[derive(Serialize)] +#[serde(untagged)] +enum KeyspaceSer<'a> { + Unit(()), + Name(&'a str), + #[cfg(feature = "apiv2-no-prefix")] + Map(KeyspaceSerMap<'a>), +} + +#[derive(Serialize)] +#[cfg(feature = "apiv2-no-prefix")] +struct KeyspaceSerMap<'a> { + mode: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + name: Option<&'a str>, +} + +impl ser::Serialize for Keyspace { + fn serialize(&self, serializer: S) -> Result + where + S: ser::Serializer, + { + let repr = match self { + // Prefer omitting the field at the `Config` layer; if serialized standalone, this is a + // serde "unit" value (e.g. `null` in JSON), matching the legacy `Option::None` + // meaning "no keyspace configured". + Keyspace::Disable => KeyspaceSer::Unit(()), + Keyspace::Enable { name } => KeyspaceSer::Name(name), + #[cfg(feature = "apiv2-no-prefix")] + Keyspace::ApiV2NoPrefix => KeyspaceSer::Map(KeyspaceSerMap { + mode: "api-v2-no-prefix", + name: None, + }), + }; + serde::Serialize::serialize(&repr, serializer) + } +} + +impl<'de> de::Deserialize<'de> for Keyspace { + fn deserialize(deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + let repr = ::deserialize(deserializer)?; + match repr { + KeyspaceDe::Unit(()) => Ok(Keyspace::Disable), + KeyspaceDe::Name(name) => Ok(Keyspace::Enable { name }), + KeyspaceDe::Map(KeyspaceDeMap { mode, name }) => match mode.as_deref() { + None => { + if let Some(name) = name { + Ok(Keyspace::Enable { name }) + } else { + Ok(Keyspace::Disable) + } + } + Some("disable") => Ok(Keyspace::Disable), + Some("enable") => Ok(Keyspace::Enable { + name: name.ok_or_else(|| de::Error::missing_field("name"))?, + }), + #[cfg(feature = "apiv2-no-prefix")] + Some("api-v2-no-prefix" | "apiv2-no-prefix") => Ok(Keyspace::ApiV2NoPrefix), + #[cfg(not(feature = "apiv2-no-prefix"))] + Some("api-v2-no-prefix" | "apiv2-no-prefix") => Err(de::Error::custom( + "keyspace mode \"api-v2-no-prefix\" requires feature \"apiv2-no-prefix\"", + )), + Some(other) => Err(de::Error::unknown_variant( + other, + &["disable", "enable", "api-v2-no-prefix"], + )), + }, + } + } +} + /// The configuration for either a [`RawClient`](crate::RawClient) or a /// [`TransactionClient`](crate::TransactionClient). /// @@ -19,7 +148,8 @@ pub struct Config { pub cert_path: Option, pub key_path: Option, pub timeout: Duration, - pub keyspace: Option, + #[serde(default, skip_serializing_if = "Keyspace::is_disable")] + pub keyspace: Keyspace, } const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); @@ -31,7 +161,7 @@ impl Default for Config { cert_path: None, key_path: None, timeout: DEFAULT_REQUEST_TIMEOUT, - keyspace: None, + keyspace: Keyspace::default(), } } } @@ -99,7 +229,114 @@ impl Config { /// Server should enable `storage.api-version = 2` to use this feature. #[must_use] pub fn with_keyspace(mut self, keyspace: &str) -> Self { - self.keyspace = Some(keyspace.to_owned()); + self.keyspace = Keyspace::Enable { + name: keyspace.to_owned(), + }; self } + + /// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix. + /// + /// This is intended for **server-side embedding** use cases (e.g. embedding this client in + /// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed + /// through unchanged. + #[cfg(feature = "apiv2-no-prefix")] + #[must_use] + pub fn with_api_v2_no_prefix(mut self) -> Self { + self.keyspace = Keyspace::ApiV2NoPrefix; + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_toml_deserialize_cases() { + // Parse TOML into `Config` (covers `Keyspace` deserialization via the field). + let cases: Vec<(&str, Keyspace)> = vec![ + ("", Keyspace::Disable), + ( + "keyspace = \"DEFAULT\"\n", + Keyspace::Enable { + name: "DEFAULT".to_owned(), + }, + ), + ("keyspace = { mode = \"disable\" }\n", Keyspace::Disable), + ( + "keyspace = { mode = \"enable\", name = \"DEFAULT\" }\n", + Keyspace::Enable { + name: "DEFAULT".to_owned(), + }, + ), + // Backward/forward compatibility: allow a structured keyspace table without `mode`. + ( + "keyspace = { name = \"DEFAULT\" }\n", + Keyspace::Enable { + name: "DEFAULT".to_owned(), + }, + ), + // Empty table behaves like disabled. + ("keyspace = {}\n", Keyspace::Disable), + ]; + + #[cfg(feature = "apiv2-no-prefix")] + let cases = { + let mut cases = cases; + cases.push(( + "keyspace = { mode = \"api-v2-no-prefix\" }\n", + Keyspace::ApiV2NoPrefix, + )); + cases + }; + + for (input, expected_keyspace) in cases { + let cfg: Config = toml::from_str(input).unwrap(); + assert_eq!(cfg.keyspace, expected_keyspace); + } + + // Invalid structured forms. + let err = toml::from_str::("keyspace = { mode = \"enable\" }\n") + .unwrap_err() + .to_string(); + assert!( + err.contains("missing field") || err.contains("missing_field"), + "unexpected error: {err}" + ); + + #[cfg(not(feature = "apiv2-no-prefix"))] + { + let err = toml::from_str::("keyspace = { mode = \"api-v2-no-prefix\" }\n") + .unwrap_err() + .to_string(); + assert!(err.contains("requires feature"), "unexpected error: {err}"); + } + } + + #[test] + fn test_config_toml_serialize_cases() { + let cases: Vec<(Config, &str)> = vec![ + (Config::default(), "[timeout]\nsecs = 2\nnanos = 0\n"), + ( + Config::default().with_keyspace("DEFAULT"), + "keyspace = \"DEFAULT\"\n\n[timeout]\nsecs = 2\nnanos = 0\n", + ), + ]; + + #[cfg(feature = "apiv2-no-prefix")] + let cases = { + let mut cases = cases; + cases.push(( + Config::default().with_api_v2_no_prefix(), + "[timeout]\nsecs = 2\nnanos = 0\n\n[keyspace]\nmode = \"api-v2-no-prefix\"\n", + )); + cases + }; + + for (cfg, expected) in cases { + let out = toml::to_string(&cfg).unwrap(); + assert_eq!(out, expected); + } + } } diff --git a/src/lib.rs b/src/lib.rs index 60dc2956..616975dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,8 @@ pub use common::Error; pub use common::Result; #[doc(inline)] pub use config::Config; +#[doc(inline)] +pub use config::Keyspace; #[doc(inline)] pub use crate::backoff::Backoff; diff --git a/src/raw/client.rs b/src/raw/client.rs index 620531ee..4c7c8594 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -10,6 +10,7 @@ use tokio::time::sleep; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::common::Error; use crate::config::Config; +use crate::config::Keyspace as ConfigKeyspace; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse}; @@ -107,18 +108,25 @@ impl Client { pd_endpoints: Vec, config: Config, ) -> Result { - let enable_codec = config.keyspace.is_some(); + let enable_codec = match &config.keyspace { + ConfigKeyspace::Disable => false, + ConfigKeyspace::Enable { .. } => true, + #[cfg(feature = "apiv2-no-prefix")] + ConfigKeyspace::ApiV2NoPrefix => true, + }; let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?); let keyspace = match config.keyspace { - Some(keyspace) => { - let keyspace = rpc.load_keyspace(&keyspace).await?; + ConfigKeyspace::Disable => Keyspace::Disable, + ConfigKeyspace::Enable { name } => { + let keyspace = rpc.load_keyspace(&name).await?; Keyspace::Enable { keyspace_id: keyspace.id, } } - None => Keyspace::Disable, + #[cfg(feature = "apiv2-no-prefix")] + ConfigKeyspace::ApiV2NoPrefix => Keyspace::ApiV2NoPrefix, }; Ok(Client { rpc, diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 118e6fbf..94fd5f76 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -15,7 +15,16 @@ pub const KEYSPACE_PREFIX_LEN: usize = 4; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Keyspace { Disable, - Enable { keyspace_id: u32 }, + Enable { + keyspace_id: u32, + }, + /// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix. + /// + /// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in + /// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed + /// through unchanged. + #[cfg(feature = "apiv2-no-prefix")] + ApiV2NoPrefix, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -29,6 +38,8 @@ impl Keyspace { match self { Keyspace::Disable => kvrpcpb::ApiVersion::V1, Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2, + #[cfg(feature = "apiv2-no-prefix")] + Keyspace::ApiV2NoPrefix => kvrpcpb::ApiVersion::V2, } } } @@ -43,12 +54,10 @@ pub trait TruncateKeyspace { impl EncodeKeyspace for Key { fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { - let prefix = match keyspace { - Keyspace::Disable => { - return self; - } - Keyspace::Enable { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), + let Keyspace::Enable { keyspace_id } = keyspace else { + return self; }; + let prefix = keyspace_prefix(keyspace_id, key_mode); prepend_bytes(&mut self.0, &prefix); @@ -68,11 +77,9 @@ impl EncodeKeyspace for BoundRange { self.from = match self.from { Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)), Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)), - Bound::Unbounded => { - let key = Key::from(vec![]); - Bound::Included(key.encode_keyspace(keyspace, key_mode)) - } + Bound::Unbounded => Bound::Included(Key::EMPTY.encode_keyspace(keyspace, key_mode)), }; + self.to = match self.to { Bound::Included(key) if !key.is_empty() => { Bound::Included(key.encode_keyspace(keyspace, key_mode)) @@ -80,16 +87,15 @@ impl EncodeKeyspace for BoundRange { Bound::Excluded(key) if !key.is_empty() => { Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) } - _ => { - let key = Key::from(vec![]); - let keyspace = match keyspace { - Keyspace::Disable => Keyspace::Disable, - Keyspace::Enable { keyspace_id } => Keyspace::Enable { + _ => match keyspace { + Keyspace::Enable { keyspace_id } => Bound::Excluded(Key::EMPTY.encode_keyspace( + Keyspace::Enable { keyspace_id: keyspace_id + 1, }, - }; - Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) - } + key_mode, + )), + _ => Bound::Excluded(Key::EMPTY), + }, }; self } @@ -106,7 +112,7 @@ impl EncodeKeyspace for Mutation { impl TruncateKeyspace for Key { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { - if let Keyspace::Disable = keyspace { + if !matches!(keyspace, Keyspace::Enable { .. }) { return self; } @@ -133,6 +139,9 @@ impl TruncateKeyspace for Range { impl TruncateKeyspace for Vec> { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } for range in &mut self { take_mut::take(range, |range| range.truncate_keyspace(keyspace)); } @@ -142,6 +151,9 @@ impl TruncateKeyspace for Vec> { impl TruncateKeyspace for Vec { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } for pair in &mut self { take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace)); } @@ -151,6 +163,9 @@ impl TruncateKeyspace for Vec { impl TruncateKeyspace for Vec { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } for lock in &mut self { take_mut::take(&mut lock.key, |key| { Key::from(key).truncate_keyspace(keyspace).into() @@ -277,4 +292,66 @@ mod tests { let expected_key = Key::from(vec![0xBE, 0xEF]); assert_eq!(key.truncate_keyspace(keyspace), expected_key); } + + #[cfg(feature = "apiv2-no-prefix")] + #[test] + fn test_apiv2_no_prefix_api_version() { + assert_eq!( + Keyspace::ApiV2NoPrefix.api_version(), + kvrpcpb::ApiVersion::V2 + ); + } + + #[cfg(feature = "apiv2-no-prefix")] + #[test] + fn test_apiv2_no_prefix_encode_is_noop() { + let keyspace = Keyspace::ApiV2NoPrefix; + let key_mode = KeyMode::Txn; + + let key = Key::from(vec![b'x', 0, 0, 0, b'k']); + assert_eq!(key.clone().encode_keyspace(keyspace, key_mode), key); + + let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']); + assert_eq!(pair.clone().encode_keyspace(keyspace, key_mode), pair); + + let bound: BoundRange = + (Key::from(vec![b'x', 0, 0, 0, b'a'])..Key::from(vec![b'x', 0, 0, 0, b'b'])).into(); + assert_eq!(bound.clone().encode_keyspace(keyspace, key_mode), bound); + + let mutation = Mutation::Put(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![1, 2, 3]); + assert_eq!( + mutation.clone().encode_keyspace(keyspace, key_mode), + mutation + ); + } + + #[cfg(feature = "apiv2-no-prefix")] + #[test] + fn test_apiv2_no_prefix_truncate_is_noop() { + let keyspace = Keyspace::ApiV2NoPrefix; + + let key = Key::from(vec![b'x', 0, 0, 0, b'k']); + assert_eq!(key.clone().truncate_keyspace(keyspace), key); + + let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']); + assert_eq!(pair.clone().truncate_keyspace(keyspace), pair); + + let range = Range { + start: Key::from(vec![b'x', 0, 0, 0, b'a']), + end: Key::from(vec![b'x', 0, 0, 0, b'b']), + }; + assert_eq!(range.clone().truncate_keyspace(keyspace), range); + + let pairs = vec![pair]; + assert_eq!(pairs.clone().truncate_keyspace(keyspace), pairs); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'x', 0, 0, 0, b'k'], + primary_lock: vec![b'x', 0, 0, 0, b'p'], + secondaries: vec![vec![b'x', 0, 0, 0, b's']], + ..Default::default() + }; + let locks = vec![lock]; + assert_eq!(locks.clone().truncate_keyspace(keyspace), locks); + } } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index a9bb2269..de88c941 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -7,6 +7,7 @@ use log::info; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; +use crate::config::Keyspace as ConfigKeyspace; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; @@ -110,13 +111,15 @@ impl Client { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?); let keyspace = match config.keyspace { - Some(keyspace) => { - let keyspace = pd.load_keyspace(&keyspace).await?; + ConfigKeyspace::Disable => Keyspace::Disable, + ConfigKeyspace::Enable { name } => { + let keyspace = pd.load_keyspace(&name).await?; Keyspace::Enable { keyspace_id: keyspace.id, } } - None => Keyspace::Disable, + #[cfg(feature = "apiv2-no-prefix")] + ConfigKeyspace::ApiV2NoPrefix => Keyspace::ApiV2NoPrefix, }; Ok(Client { pd, keyspace }) }