diff --git a/Cargo.toml b/Cargo.toml index 2e224720d..004976841 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,15 +170,15 @@ harness = false #vss-client-ng = { path = "../vss-client" } #vss-client-ng = { git = "https://github.com/lightningdevkit/vss-client", branch = "main" } # -#[patch."https://github.com/lightningdevkit/rust-lightning"] -#lightning = { path = "../rust-lightning/lightning" } -#lightning-types = { path = "../rust-lightning/lightning-types" } -#lightning-invoice = { path = "../rust-lightning/lightning-invoice" } -#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } -#lightning-persister = { path = "../rust-lightning/lightning-persister" } -#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" } -#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } -#lightning-block-sync = { path = "../rust-lightning/lightning-block-sync" } -#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync" } -#lightning-liquidity = { path = "../rust-lightning/lightning-liquidity" } -#lightning-macros = { path = "../rust-lightning/lightning-macros" } +[patch."https://github.com/lightningdevkit/rust-lightning"] +lightning = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-types = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-invoice = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-net-tokio = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-persister = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-background-processor = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-rapid-gossip-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-block-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-transaction-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-liquidity = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } +lightning-macros = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-internal-deferred-writes-ldk-node-based" } diff --git a/src/builder.rs b/src/builder.rs index a2ea9aea7..84844974d 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -75,9 +75,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, + GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, + PendingPaymentStore, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1318,7 +1318,7 @@ fn build_with_store_internal( let peer_storage_key = keys_manager.get_peer_storage_key(); let monitor_reader = Arc::new(AsyncPersister::new( - Arc::clone(&kv_store), + DynStoreRef(Arc::clone(&kv_store)), RuntimeSpawner::new(Arc::clone(&runtime)), Arc::clone(&logger), PERSISTER_MAX_PENDING_UPDATES, @@ -1331,7 +1331,7 @@ fn build_with_store_internal( // Read ChannelMonitors and the NetworkGraph let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (monitor_read_res, network_graph_res) = runtime.block_on(async move { + let (monitor_read_res, network_graph_res) = runtime.block_on(async { tokio::join!( monitor_reader.read_all_channel_monitors_with_updates_parallel(), read_network_graph(&*kv_store_ref, logger_ref), @@ -1351,26 +1351,21 @@ fn build_with_store_internal( }, }; - let persister = Arc::new(Persister::new( - Arc::clone(&kv_store), - Arc::clone(&logger), - PERSISTER_MAX_PENDING_UPDATES, - Arc::clone(&keys_manager), - Arc::clone(&keys_manager), - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - )); - // Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - Some(Arc::clone(&chain_source)), - Arc::clone(&tx_broadcaster), - Arc::clone(&logger), - Arc::clone(&fee_estimator), - Arc::clone(&persister), - Arc::clone(&keys_manager), - peer_storage_key, - )); + let chain_monitor: Arc = { + let persister = Arc::try_unwrap(monitor_reader) + .unwrap_or_else(|_| panic!("Arc should have no other references")); + Arc::new(chainmonitor::ChainMonitor::new_async_beta( + Some(Arc::clone(&chain_source)), + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + persister, + Arc::clone(&keys_manager), + peer_storage_key, + true, + )) + }; // Initialize the network graph, scorer, and router let network_graph = match network_graph_res { diff --git a/src/event.rs b/src/event.rs index a4dcc8cf3..bb48e122f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -673,6 +673,26 @@ where if info.status == PaymentStatus::Succeeded || matches!(info.kind, PaymentKind::Spontaneous { .. }) { + let stored_preimage = match info.kind { + PaymentKind::Bolt11 { preimage, .. } + | PaymentKind::Bolt11Jit { preimage, .. } + | PaymentKind::Bolt12Offer { preimage, .. } + | PaymentKind::Bolt12Refund { preimage, .. } + | PaymentKind::Spontaneous { preimage, .. } => preimage, + _ => None, + }; + + if let Some(preimage) = stored_preimage { + log_info!( + self.logger, + "Re-claiming previously succeeded payment with hash {} of {}msat", + hex_utils::to_string(&payment_hash.0), + amount_msat, + ); + self.channel_manager.claim_funds(preimage); + return Ok(()); + } + log_info!( self.logger, "Refused duplicate inbound payment from payment hash {} of {}msat", diff --git a/src/types.rs b/src/types.rs index c5ff07756..afd163443 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,9 +23,7 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{ - KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync, -}; +use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::GossipVerifier; @@ -135,6 +133,39 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { pub(crate) type DynStore = dyn DynStoreTrait; +// Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` +// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by +// returning `Pin>` instead, and this wrapper bridges the two by delegating +// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods. +#[derive(Clone)] +pub(crate) struct DynStoreRef(pub(crate) Arc); + +impl KVStore for DynStoreRef { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, bitcoin::io::Error>> + Send + 'static { + DynStoreTrait::read_async(&*self.0, primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + Send + 'static { + DynStoreTrait::write_async(&*self.0, primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + Send + 'static { + DynStoreTrait::remove_async(&*self.0, primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, bitcoin::io::Error>> + Send + 'static { + DynStoreTrait::list_async(&*self.0, primary_namespace, secondary_namespace) + } +} + pub(crate) struct DynStoreWrapper(pub(crate) T); impl DynStoreTrait for DynStoreWrapper { @@ -188,7 +219,7 @@ impl DynStoreTrait for DynStoreWrapper } pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< - Arc, + DynStoreRef, RuntimeSpawner, Arc, Arc, @@ -197,22 +228,21 @@ pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< Arc, >; -pub type Persister = MonitorUpdatingPersister< - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, ->; - pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, Arc, Arc, Arc, - Arc, + chainmonitor::AsyncPersister< + DynStoreRef, + RuntimeSpawner, + Arc, + Arc, + Arc, + Arc, + Arc, + >, Arc, >; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index c75a6947c..59a5677da 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1211,8 +1211,10 @@ pub(crate) async fn do_channel_full_cycle( ); println!("\nB close_channel (force: {})", force_close); + // Allow the background processor to flush deferred monitor writes so that + // the channel state no longer has monitor_update_in_progress set. + tokio::time::sleep(Duration::from_secs(1)).await; if force_close { - tokio::time::sleep(Duration::from_secs(1)).await; node_a.force_close_channel(&user_channel_id_a, node_b.node_id(), None).unwrap(); } else { node_a.close_channel(&user_channel_id_a, node_b.node_id()).unwrap();