From 2b9635ae59862652ae1c9f735327f56dd06f44ca Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 6 Jan 2026 22:49:48 +0800 Subject: [PATCH 1/4] transaction: Add resolve_locks method Signed-off-by: Ping Yu --- src/transaction/client.rs | 33 ++++++++++++++++++++++++++++++ tests/failpoint_tests.rs | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index befde865..6d1a2bbf 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -9,6 +9,7 @@ use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; +use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest}; use crate::request::plan::CleanupLocksResult; @@ -17,6 +18,7 @@ use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; use crate::transaction::lowering::new_scan_lock_request; use crate::transaction::lowering::new_unsafe_destroy_range_request; +use crate::transaction::resolve_locks; use crate::transaction::ResolveLocksContext; use crate::transaction::Snapshot; use crate::transaction::Transaction; @@ -308,6 +310,37 @@ impl Client { plan.execute().await } + /// Resolves the given locks and returns any that remain live. + /// + /// This method retries until either all locks are resolved or the provided + /// `backoff` is exhausted. The `timestamp` is used as the caller start + /// timestamp when checking transaction status. + pub async fn resolve_locks( + &self, + locks: Vec, + timestamp: Timestamp, + mut backoff: Backoff, + ) -> Result> { + let mut live_locks = locks; + loop { + live_locks = resolve_locks(live_locks, timestamp.clone(), self.pd.clone()).await?; + if live_locks.is_empty() { + return Ok(live_locks); + } + + if backoff.is_none() { + return Ok(live_locks); + } + + match backoff.next_delay_duration() { + None => return Ok(live_locks), + Some(delay_duration) => { + tokio::time::sleep(delay_duration).await; + } + } + } + } + /// Cleans up all keys in a range and quickly reclaim disk space. /// /// The range can span over multiple regions. diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index f34dff48..28bdc3d4 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -261,6 +261,48 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn txn_resolve_locks() -> Result<()> { + init().await?; + let scenario = FailScenario::setup(); + + fail::cfg("after-prewrite", "return").unwrap(); + defer! {{ + fail::cfg("after-prewrite", "off").unwrap(); + }} + + let client = TransactionClient::new(pd_addrs()).await?; + let key = b"resolve-locks-key".to_vec(); + let keys = HashSet::from_iter(vec![key.clone()]); + let mut txn = client + .begin_with_options( + TransactionOptions::new_optimistic() + .heartbeat_option(HeartbeatOption::NoHeartbeat) + .drop_check(CheckLevel::Warn), + ) + .await?; + txn.put(key.clone(), b"value".to_vec()).await?; + assert!(txn.commit().await.is_err()); + + let safepoint = client.current_timestamp().await?; + let locks = client.scan_locks(&safepoint, vec![].., 1024).await?; + assert!(locks.iter().any(|lock| lock.key == key)); + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + let start_version = client.current_timestamp().await?; + let live_locks = client + .resolve_locks(locks, start_version, OPTIMISTIC_BACKOFF) + .await?; + assert!(live_locks.is_empty()); + assert_eq!(count_locks(&client).await?, 0); + must_rollbacked(&client, keys).await; + + scenario.teardown(); + Ok(()) +} + #[tokio::test] #[serial] async fn txn_cleanup_2pc_locks() -> Result<()> { From d945aa45344fe88282a774b375ad2de7304ba182 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 13 Jan 2026 12:16:53 +0800 Subject: [PATCH 2/4] public scan_locks Signed-off-by: Ping Yu --- src/transaction/client.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 6d1a2bbf..7c27d54d 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -292,9 +292,7 @@ impl Client { plan.execute().await } - // For test. // Note: `batch_size` must be >= expected number of locks. - #[cfg(feature = "integration-tests")] pub async fn scan_locks( &self, safepoint: &Timestamp, From 89e47bc1e7d68c18b82db719a1579d78c2e134d4 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 14 Jan 2026 17:02:33 +0800 Subject: [PATCH 3/4] handle region error Signed-off-by: Ping Yu --- src/request/plan.rs | 205 ++++++++++++++++++++-------------------- src/transaction/lock.rs | 74 ++++++++++----- 2 files changed, 152 insertions(+), 127 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index d3fb6ffe..679d0357 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -86,7 +86,7 @@ impl StoreRequest for Dispatch { const MULTI_REGION_CONCURRENCY: usize = 16; const MULTI_STORES_CONCURRENCY: usize = 16; -fn is_grpc_error(e: &Error) -> bool { +pub(crate) fn is_grpc_error(e: &Error) -> bool { matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)) } @@ -206,7 +206,7 @@ where match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = - Self::handle_region_error(pd_client.clone(), e, region_store).await?; + handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { sleep(duration).await; @@ -227,109 +227,6 @@ where } } - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn handle_region_error( - pd_client: Arc, - e: errorpb::Error, - region_store: RegionStore, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - let store_id = region_store.region_with_leader.get_store_id(); - if let Some(not_leader) = e.not_leader { - if let Some(leader) = not_leader.leader { - match pd_client - .update_leader(region_store.region_with_leader.ver_id(), leader) - .await - { - Ok(_) => Ok(true), - Err(e) => { - pd_client.invalidate_region_cache(ver_id).await; - Err(e) - } - } - } else { - // The peer doesn't know who is the current leader. Generally it's because - // the Raft group is in an election, but it's possible that the peer is - // isolated and removed from the Raft group. So it's necessary to reload - // the region from PD. - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } else if e.store_not_match.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - if let Ok(store_id) = store_id { - pd_client.invalidate_store_cache(store_id).await; - } - Ok(false) - } else if e.epoch_not_match.is_some() { - Self::on_region_epoch_not_match( - pd_client.clone(), - region_store, - e.epoch_not_match.unwrap(), - ) - .await - } else if e.stale_command.is_some() || e.region_not_found.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.server_is_busy.is_some() - || e.raft_entry_too_large.is_some() - || e.max_timestamp_not_synced.is_some() - { - Err(Error::RegionError(Box::new(e))) - } else { - // TODO: pass the logger around - // info!("unknwon region error: {:?}", e); - pd_client.invalidate_region_cache(ver_id).await; - if let Ok(store_id) = store_id { - pd_client.invalidate_store_cache(store_id).await; - } - Ok(false) - } - } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn on_region_epoch_not_match( - pd_client: Arc, - region_store: RegionStore, - error: EpochNotMatch, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if error.current_regions.is_empty() { - pd_client.invalidate_region_cache(ver_id).await; - return Ok(true); - } - - for r in error.current_regions { - if r.id == region_store.region_with_leader.id() { - let region_epoch = r.region_epoch.unwrap(); - let returned_conf_ver = region_epoch.conf_ver; - let returned_version = region_epoch.version; - let current_region_epoch = region_store - .region_with_leader - .region - .region_epoch - .clone() - .unwrap(); - let current_conf_ver = current_region_epoch.conf_ver; - let current_version = current_region_epoch.version; - - // Find whether the current region is ahead of TiKV's. If so, backoff. - if returned_conf_ver < current_conf_ver || returned_version < current_version { - return Ok(false); - } - } - } - // TODO: finer grained processing - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - #[allow(clippy::too_many_arguments)] async fn handle_other_error( pd_client: Arc, @@ -365,6 +262,104 @@ where } } +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub(crate) async fn handle_region_error( + pd_client: Arc, + e: errorpb::Error, + region_store: RegionStore, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + let store_id = region_store.region_with_leader.get_store_id(); + if let Some(not_leader) = e.not_leader { + if let Some(leader) = not_leader.leader { + match pd_client + .update_leader(region_store.region_with_leader.ver_id(), leader) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.store_not_match.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } + Ok(false) + } else if e.epoch_not_match.is_some() { + on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await + } else if e.stale_command.is_some() || e.region_not_found.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.server_is_busy.is_some() + || e.raft_entry_too_large.is_some() + || e.max_timestamp_not_synced.is_some() + { + Err(Error::RegionError(Box::new(e))) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } + Ok(false) + } +} + +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.current_regions.is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.current_regions { + if r.id == region_store.region_with_leader.id() { + let region_epoch = r.region_epoch.unwrap(); + let returned_conf_ver = region_epoch.conf_ver; + let returned_version = region_epoch.version; + let current_region_epoch = region_store + .region_with_leader + .region + .region_epoch + .clone() + .unwrap(); + let current_conf_ver = current_region_epoch.conf_ver; + let current_version = current_region_epoch.version; + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) +} + impl Clone for RetryableMultiRegion { fn clone(&self) -> Self { RetryableMultiRegion { diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 21f3d9f6..dfd74cf4 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -20,6 +20,8 @@ use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; use crate::request::codec::EncodedRequest; +use crate::request::plan::handle_region_error; +use crate::request::plan::is_grpc_error; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Plan; @@ -34,8 +36,6 @@ use crate::transaction::requests::TransactionStatusKind; use crate::Error; use crate::Result; -const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; - /// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved. /// /// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock, @@ -107,6 +107,7 @@ pub async fn resolve_locks( commit_version, lock.is_txn_file, pd_client.clone(), + OPTIMISTIC_BACKOFF, ) .await?; clean_regions @@ -128,24 +129,36 @@ async fn resolve_lock_with_retry( commit_version: u64, is_txn_file: bool, pd_client: Arc, + mut backoff: Backoff, ) -> Result { debug!("resolving locks with retry"); - // FIXME: Add backoff - let timestamp = Timestamp::from_version(start_version); - let mut error = None; - for i in 0..RESOLVE_LOCK_RETRY_LIMIT { - debug!("resolving locks: attempt {}", (i + 1)); + let mut attempt = 0; + loop { + attempt += 1; + debug!("resolving locks: attempt {}", attempt); let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version, is_txn_file); let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .single_region_with_store(store) - .await? - .resolve_lock(timestamp.clone(), Backoff::no_backoff()) - .extract_error() - .plan(); + let plan_builder = match crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + .single_region_with_store(store.clone()) + .await + { + Ok(plan_builder) => plan_builder, + Err(Error::LeaderNotFound { region }) => { + pd_client.invalidate_region_cache(region.clone()).await; + match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + continue; + } + None => return Err(Error::LeaderNotFound { region }), + } + } + Err(err) => return Err(err), + }; + let plan = plan_builder.extract_error().plan(); match plan.execute().await { Ok(_) => { return Ok(ver_id); @@ -154,18 +167,34 @@ async fn resolve_lock_with_retry( Err(Error::ExtractedErrors(mut errors)) => { // ResolveLockResponse can have at most 1 error match errors.pop() { - e @ Some(Error::RegionError(_)) => { - error = e; - continue; - } + Some(Error::RegionError(e)) => match backoff.next_delay_duration() { + Some(duration) => { + let region_error_resolved = + handle_region_error(pd_client.clone(), *e, store.clone()).await?; + if !region_error_resolved { + sleep(duration).await; + } + continue; + } + None => return Err(Error::RegionError(e)), + }, Some(e) => return Err(e), None => unreachable!(), } } + Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() { + Some(duration) => { + if let Ok(store_id) = store.region_with_leader.get_store_id() { + pd_client.invalidate_store_cache(store_id).await; + } + sleep(duration).await; + continue; + } + None => return Err(e), + }, Err(e) => return Err(e), } } - Err(error.expect("no error is impossible")) } #[derive(Default, Clone)] @@ -578,15 +607,16 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, false, client.clone()) - .await - .unwrap(); + let resolved_region = + resolve_lock_with_retry(&key, 1, 2, false, client.clone(), OPTIMISTIC_BACKOFF) + .await + .unwrap(); assert_eq!(region1.ver_id(), resolved_region); // Test resolve lock over retry limit fail::cfg("region-error", "10*return").unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, false, client) + resolve_lock_with_retry(&key, 3, 4, false, client, OPTIMISTIC_BACKOFF) .await .expect_err("should return error"); } From f8e9f6f2cdcf3cf22959e79d980c55f9bd4882ea Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 14 Jan 2026 17:16:42 +0800 Subject: [PATCH 4/4] fix ut Signed-off-by: Ping Yu --- src/transaction/lock.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index dfd74cf4..1faabf81 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -582,6 +582,8 @@ pub fn lock_until_expired_ms(lock_version: u64, ttl: u64, current: Timestamp) -> mod tests { use std::any::Any; + use fail::FailScenario; + use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; @@ -589,8 +591,17 @@ mod tests { #[tokio::test] async fn test_resolve_lock_with_retry() { + let _scenario = FailScenario::setup(); + + const MAX_REGION_ERROR_RETRIES: u32 = 10; + let backoff = Backoff::no_jitter_backoff(0, 0, MAX_REGION_ERROR_RETRIES); + // Test resolve lock within retry limit - fail::cfg("region-error", "9*return").unwrap(); + fail::cfg( + "region-error", + &format!("{}*return", MAX_REGION_ERROR_RETRIES), + ) + .unwrap(); let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { @@ -608,15 +619,19 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); let resolved_region = - resolve_lock_with_retry(&key, 1, 2, false, client.clone(), OPTIMISTIC_BACKOFF) + resolve_lock_with_retry(&key, 1, 2, false, client.clone(), backoff.clone()) .await .unwrap(); assert_eq!(region1.ver_id(), resolved_region); // Test resolve lock over retry limit - fail::cfg("region-error", "10*return").unwrap(); + fail::cfg( + "region-error", + &format!("{}*return", MAX_REGION_ERROR_RETRIES + 1), + ) + .unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, false, client, OPTIMISTIC_BACKOFF) + resolve_lock_with_retry(&key, 3, 4, false, client, backoff) .await .expect_err("should return error"); }