From 1ebc56df9bb39de55b7ee5f778860118a6f21fdd Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Thu, 25 Jul 2024 14:31:12 +0200 Subject: [PATCH 01/13] Peer storage --- .../main/scala/fr/acinq/eclair/Features.scala | 6 +++ .../scala/fr/acinq/eclair/NodeParams.scala | 4 +- .../fr/acinq/eclair/db/DualDatabases.scala | 11 +++++ .../scala/fr/acinq/eclair/db/PeersDb.scala | 5 ++ .../fr/acinq/eclair/db/pg/PgPeersDb.scala | 47 +++++++++++++++++-- .../eclair/db/sqlite/SqlitePeersDb.scala | 41 ++++++++++++++-- .../main/scala/fr/acinq/eclair/io/Peer.scala | 47 +++++++++++++++---- .../protocol/LightningMessageCodecs.scala | 15 +++++- .../wire/protocol/LightningMessageTypes.scala | 4 ++ .../eclair/wire/protocol/PeerStorageTlv.scala | 32 +++++++++++++ .../scala/fr/acinq/eclair/TestConstants.scala | 3 ++ .../fr/acinq/eclair/db/PeersDbSpec.scala | 21 +++++++++ .../scala/fr/acinq/eclair/io/PeerSpec.scala | 30 ++++++++++-- .../eclair/io/ReconnectionTaskSpec.scala | 8 ++-- 14 files changed, 247 insertions(+), 27 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala index c9886b031e..b668ff62b7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala @@ -275,6 +275,11 @@ object Features { val mandatory = 38 } + case object ProvideStorage extends Feature with InitFeature with NodeFeature { + val rfcName = "option_provide_storage" + val mandatory = 42 + } + case object ChannelType extends Feature with InitFeature with NodeFeature { val rfcName = "option_channel_type" val mandatory = 44 @@ -358,6 +363,7 @@ object Features { DualFunding, Quiescence, OnionMessages, + ProvideStorage, ChannelType, ScidAlias, PaymentMetadata, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 09b4fc9261..6f4278aa96 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -92,7 +92,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config, willFundRates_opt: Option[LiquidityAds.WillFundRates], peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig, - onTheFlyFundingConfig: OnTheFlyFunding.Config) { + onTheFlyFundingConfig: OnTheFlyFunding.Config, + peerStorageWriteDelayMax: FiniteDuration) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -680,6 +681,7 @@ object NodeParams extends Logging { onTheFlyFundingConfig = OnTheFlyFunding.Config( proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS), ), + peerStorageWriteDelayMax = 1 minute, ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index d1ff3487ea..05219d7eec 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -15,6 +15,7 @@ import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli} import grizzled.slf4j.Logging +import scodec.bits.ByteVector import java.io.File import java.util.UUID @@ -292,6 +293,16 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb { runAsync(secondary.getRelayFees(nodeId)) primary.getRelayFees(nodeId) } + + override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = { + runAsync(secondary.updateStorage(nodeId, data)) + primary.updateStorage(nodeId, data) + } + + override def getStorage(nodeId: PublicKey): Option[ByteVector] = { + runAsync(secondary.getStorage(nodeId)) + primary.getStorage(nodeId) + } } case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends PaymentsDb { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala index ea10f348e8..b1824071d0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala @@ -19,6 +19,7 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.wire.protocol.NodeAddress +import scodec.bits.ByteVector trait PeersDb { @@ -34,4 +35,8 @@ trait PeersDb { def getRelayFees(nodeId: PublicKey): Option[RelayFees] + def updateStorage(nodeId: PublicKey, data: ByteVector): Unit + + def getStorage(nodeId: PublicKey): Option[ByteVector] + } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala index 45094dd4a9..76102f9482 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala @@ -26,14 +26,14 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.wire.protocol._ import grizzled.slf4j.Logging -import scodec.bits.BitVector +import scodec.bits.{BitVector, ByteVector} import java.sql.Statement import javax.sql.DataSource object PgPeersDb { val DB_NAME = "peers" - val CURRENT_VERSION = 3 + val CURRENT_VERSION = 4 } class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging { @@ -54,13 +54,18 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)") } + def migration34(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") + } + using(pg.createStatement()) { statement => getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") - statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") + statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, storage BYTEA)") statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)") - case Some(v@(1 | 2)) => + statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") + case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -68,6 +73,9 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg if (v < 3) { migration23(statement) } + if (v < 4) { + migration34(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -98,6 +106,10 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg statement.setString(1, nodeId.value.toHex) statement.executeUpdate() } + using(pg.prepareStatement("DELETE FROM local.peer_storage WHERE node_id = ?")) { statement => + statement.setString(1, nodeId.value.toHex) + statement.executeUpdate() + } } } @@ -155,4 +167,31 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg } } } + + override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Postgres) { + withLock { pg => + using(pg.prepareStatement( + """ + INSERT INTO local.peer_storage (node_id, data) + VALUES (?, ?) + ON CONFLICT (node_id) + DO UPDATE SET data = EXCLUDED.data + """)) { statement => + statement.setString(1, nodeId.value.toHex) + statement.setBytes(2, data.toArray) + statement.executeUpdate() + } + } + } + + override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Postgres) { + withLock { pg => + using(pg.prepareStatement("SELECT data FROM local.peer_storage WHERE node_id = ?")) { statement => + statement.setString(1, nodeId.value.toHex) + statement.executeQuery() + .headOption + .map(rs => ByteVector(rs.getBytes("data"))) + } + } + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala index 610bb07909..ba99c46ad1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala @@ -26,13 +26,13 @@ import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, setVersion, using} import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.wire.protocol._ import grizzled.slf4j.Logging -import scodec.bits.BitVector +import scodec.bits.{BitVector, ByteVector} import java.sql.{Connection, Statement} object SqlitePeersDb { val DB_NAME = "peers" - val CURRENT_VERSION = 2 + val CURRENT_VERSION = 3 } class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { @@ -46,13 +46,23 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)") } + def migration23(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)") - case Some(v@1) => + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL)") + case Some(v@(1 | 2)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") - migration12(statement) + if (v < 2) { + migration12(statement) + } + if (v < 3) { + migration23(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -128,4 +138,27 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { ) } } + + override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Sqlite) { + using(sqlite.prepareStatement("UPDATE peer_storage SET data = ? WHERE node_id = ?")) { update => + update.setBytes(1, data.toArray) + update.setBytes(2, nodeId.value.toArray) + if (update.executeUpdate() == 0) { + using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?)")) { statement => + statement.setBytes(1, nodeId.value.toArray) + statement.setBytes(2, data.toArray) + statement.executeUpdate() + } + } + } + } + + override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Sqlite) { + using(sqlite.prepareStatement("SELECT data FROM peer_storage WHERE node_id = ?")) { statement => + statement.setBytes(1, nodeId.value.toArray) + statement.executeQuery() + .headOption + .map(rs => ByteVector(rs.getBytes("data"))) + } + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index ef7b075a4f..aa6a433db6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -45,7 +45,8 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure -import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, FailureReason, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc} +import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, FailureReason, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc} +import scodec.bits.ByteVector /** * This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time. @@ -85,7 +86,7 @@ class Peer(val nodeParams: NodeParams, FinalChannelId(state.channelId) -> channel }.toMap context.system.eventStream.publish(PeerCreated(self, remoteNodeId)) - goto(DISCONNECTED) using DisconnectedData(channels) // when we restart, we will attempt to reconnect right away, but then we'll wait + goto(DISCONNECTED) using DisconnectedData(channels, PeerStorage(nodeParams.db.peers.getStorage(remoteNodeId), written = true, TimestampMilli.min)) // when we restart, we will attempt to reconnect right away, but then we'll wait } when(DISCONNECTED) { @@ -94,7 +95,7 @@ class Peer(val nodeParams: NodeParams, stay() case Event(connectionReady: PeerConnection.ConnectionReady, d: DisconnectedData) => - gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }) + gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.peerStorage) case Event(Terminated(actor), d: DisconnectedData) if d.channels.values.toSet.contains(actor) => // we have at most 2 ids: a TemporaryChannelId and a FinalChannelId @@ -466,7 +467,7 @@ class Peer(val nodeParams: NodeParams, stopPeer() } else { d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) - goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }) + goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.peerStorage) } case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) => @@ -485,7 +486,7 @@ class Peer(val nodeParams: NodeParams, log.debug(s"got new connection, killing current one and switching") d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced) d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) - gotoConnected(connectionReady, d.channels) + gotoConnected(connectionReady, d.channels, d.peerStorage) case Event(msg: OnionMessage, _: ConnectedData) => OnionMessages.process(nodeParams.privateKey, msg) match { @@ -518,6 +519,21 @@ class Peer(val nodeParams: NodeParams, d.peerConnection forward unknownMsg stay() + case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty => + val timeSinceLastWrite = TimestampMilli.now() - d.peerStorage.lastWrite + val peerStorage = if (timeSinceLastWrite >= nodeParams.peerStorageWriteDelayMax) { + nodeParams.db.peers.updateStorage(remoteNodeId, store.blob) + PeerStorage(Some(store.blob), written = true, TimestampMilli.now()) + } else { + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageWriteDelayMax - timeSinceLastWrite) + PeerStorage(Some(store.blob), written = false, d.peerStorage.lastWrite) + } + stay() using d.copy(peerStorage = peerStorage) + + case Event(WritePeerStorage, d: ConnectedData) => + d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) + stay() using d.copy(peerStorage = PeerStorage(d.peerStorage.data, written = true, TimestampMilli.now())) + case Event(unhandledMsg: LightningMessage, _) => log.warning("ignoring message {}", unhandledMsg) stay() @@ -749,7 +765,7 @@ class Peer(val nodeParams: NodeParams, context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) } - private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = { + private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], peerStorage: PeerStorage): State = { require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}") log.debug("got authenticated connection to address {}", connectionReady.address) @@ -759,6 +775,9 @@ class Peer(val nodeParams: NodeParams, nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address) } + // If we have some data stored from our peer, we send it to them before doing anything else. + peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_)) + // let's bring existing/requested channels online channels.values.toSet[ActorRef].foreach(_ ! INPUT_RECONNECTED(connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit)) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) @@ -776,7 +795,7 @@ class Peer(val nodeParams: NodeParams, connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat)) } - goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None) + goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None, peerStorage) } /** @@ -911,12 +930,18 @@ object Peer { case class TemporaryChannelId(id: ByteVector32) extends ChannelId case class FinalChannelId(id: ByteVector32) extends ChannelId + case class PeerStorage(data: Option[ByteVector], written: Boolean, lastWrite: TimestampMilli) + sealed trait Data { def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef] + def peerStorage: PeerStorage } - case object Nothing extends Data { override def channels = Map.empty } - case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data - case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates]) extends Data { + case object Nothing extends Data { + override def channels = Map.empty + override def peerStorage: PeerStorage = PeerStorage(None, written = true, TimestampMilli.min) + } + case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], peerStorage: PeerStorage) extends Data + case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit) def localFeatures: Features[InitFeature] = localInit.features def remoteFeatures: Features[InitFeature] = remoteInit.features @@ -1029,5 +1054,7 @@ object Peer { case class RelayOnionMessage(messageId: ByteVector32, msg: OnionMessage, replyTo_opt: Option[typed.ActorRef[Status]]) case class RelayUnknownMessage(unknownMessage: UnknownMessage) + + case object WritePeerStorage // @formatter:on } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala index 9e6128d0a6..beb8f8d3b3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala @@ -22,7 +22,7 @@ import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.{Features, InitFeature, KamonExt} import scodec.bits.{BinStringSyntax, BitVector, ByteVector} import scodec.codecs._ -import scodec.{Attempt, Codec} +import scodec.{Attempt, Codec, Err} /** * Created by PM on 15/11/2016. @@ -389,6 +389,17 @@ object LightningMessageCodecs { ("onionPacket" | MessageOnionCodecs.messageOnionPacketCodec) :: ("tlvStream" | OnionMessageTlv.onionMessageTlvCodec)).as[OnionMessage] + private def isAcceptableBlobLength(length: Int) = + if (length <= 65531) Attempt.Successful(length) else Attempt.failure(Err(s"length $length is larger than 65531")) + + val peerStorageStore: Codec[PeerStorageStore] = ( + ("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) :: + ("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageStore] + + val peerStorageRetrieval: Codec[PeerStorageRetrieval] = ( + ("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) :: + ("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageRetrieval] + // NB: blank lines to minimize merge conflicts // @@ -476,6 +487,8 @@ object LightningMessageCodecs { val lightningMessageCodec = discriminated[LightningMessage].by(uint16) .typecase(1, warningCodec) .typecase(2, stfuCodec) + .typecase(7, peerStorageStore) + .typecase(9, peerStorageRetrieval) .typecase(16, initCodec) .typecase(17, errorCodec) .typecase(18, pingCodec) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala index 40cc0633c1..de26d328b7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala @@ -602,6 +602,10 @@ case class GossipTimestampFilter(chainHash: BlockHash, firstTimestamp: Timestamp case class OnionMessage(pathKey: PublicKey, onionRoutingPacket: OnionRoutingPacket, tlvStream: TlvStream[OnionMessageTlv] = TlvStream.empty) extends LightningMessage +case class PeerStorageStore(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage + +case class PeerStorageRetrieval(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage + // NB: blank lines to minimize merge conflicts // diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala new file mode 100644 index 0000000000..4ebb3fec39 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2021 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.wire.protocol + +import fr.acinq.eclair.wire.protocol.CommonCodecs.varint +import fr.acinq.eclair.wire.protocol.TlvCodecs.tlvStream +import scodec.Codec +import scodec.codecs.discriminated + +/** + * Created by thomash on July 2024. + */ + +sealed trait PeerStorageTlv extends Tlv + +object PeerStorageTlv { + val peerStorageTlvCodec: Codec[TlvStream[PeerStorageTlv]] = tlvStream(discriminated[PeerStorageTlv].by(varint)) +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index e32afd303d..ab8691a34e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -108,6 +108,7 @@ object TestConstants { Features.StaticRemoteKey -> FeatureSupport.Mandatory, Features.Quiescence -> FeatureSupport.Optional, Features.SplicePrototype -> FeatureSupport.Optional, + Features.ProvideStorage -> FeatureSupport.Optional, ), unknown = Set(UnknownFeature(TestFeature.optional)) ), @@ -240,6 +241,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), + peerStorageWriteDelayMax = 5 seconds, ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -416,6 +418,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), + peerStorageWriteDelayMax = 5 seconds, ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala index 848b946f04..76f4719014 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala @@ -24,6 +24,7 @@ import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair._ import fr.acinq.eclair.wire.protocol.{NodeAddress, Tor2, Tor3} import org.scalatest.funsuite.AnyFunSuite +import scodec.bits.HexStringSyntax import java.util.concurrent.Executors import scala.concurrent.duration._ @@ -107,4 +108,24 @@ class PeersDbSpec extends AnyFunSuite { } } + test("peer storage") { + forAllDbs { dbs => + val db = dbs.peers + + val a = randomKey().publicKey + val b = randomKey().publicKey + + assert(db.getStorage(a) == None) + assert(db.getStorage(b) == None) + db.updateStorage(a, hex"012345") + assert(db.getStorage(a) == Some(hex"012345")) + assert(db.getStorage(b) == None) + db.updateStorage(a, hex"6789") + assert(db.getStorage(a) == Some(hex"6789")) + assert(db.getStorage(b) == None) + db.updateStorage(b, hex"abcd") + assert(db.getStorage(a) == Some(hex"6789")) + assert(db.getStorage(b) == Some(hex"abcd")) + } + } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 55e5981e0d..097dd26dbe 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -38,7 +38,7 @@ import fr.acinq.eclair.wire.protocol import fr.acinq.eclair.wire.protocol._ import org.scalatest.Inside.inside import org.scalatest.{Tag, TestData} -import scodec.bits.ByteVector +import scodec.bits.{ByteVector, HexStringSyntax} import java.net.InetSocketAddress import java.nio.channels.ServerSocketChannel @@ -108,11 +108,14 @@ class PeerSpec extends FixtureSpec { def cleanupFixture(fixture: FixtureParam): Unit = fixture.cleanup() - def connect(remoteNodeId: PublicKey, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, switchboard: TestProbe, channels: Set[PersistentChannelData] = Set.empty, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()))(implicit system: ActorSystem): Unit = { + def connect(remoteNodeId: PublicKey, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, switchboard: TestProbe, channels: Set[PersistentChannelData] = Set.empty, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), sendInit: Boolean = true, peerStorage: Option[ByteVector] = None)(implicit system: ActorSystem): Unit = { // let's simulate a connection - switchboard.send(peer, Peer.Init(channels, Map.empty)) + if (sendInit) { + switchboard.send(peer, Peer.Init(channels, Map.empty)) + } val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures()) switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit)) + peerStorage.foreach(data => peerConnection.expectMsg(PeerStorageRetrieval(data))) peerConnection.expectMsgType[RecommendedFeerates] val probe = TestProbe() probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped))) @@ -752,6 +755,27 @@ class PeerSpec extends FixtureSpec { channel.expectMsg(open) } + test("peer storage") { f => + import f._ + + val peerConnection1 = peerConnection + val peerConnection2 = TestProbe() + val peerConnection3 = TestProbe() + + nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef") + connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef")) + peerConnection1.send(peer, PeerStorageStore(hex"c0ffee")) + peerConnection1.send(peer, PeerStorageStore(hex"0123456789")) + Thread.sleep(1000) + peer ! Peer.Disconnect(f.remoteNodeId) + connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"0123456789")) + peerConnection2.send(peer, PeerStorageStore(hex"1111")) + connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"1111")) + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"c0ffee")) // Only the first update was written because of the rate limit. + Thread.sleep(5_000) + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) + } + } object PeerSpec { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index dd9854767f..3a2498cebe 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -20,7 +20,7 @@ import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.scalacompat.Block import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair._ -import fr.acinq.eclair.io.Peer.ChannelId +import fr.acinq.eclair.io.Peer.{ChannelId, PeerStorage} import fr.acinq.eclair.io.ReconnectionTask.WaitingData import fr.acinq.eclair.tor.Socks5ProxyParams import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, NodeAnnouncement, RecommendedFeerates} @@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw) private val PeerNothingData = Peer.Nothing - private val PeerDisconnectedData = Peer.DisconnectedData(channels) - private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None) + private val PeerDisconnectedData = Peer.DisconnectedData(channels, PeerStorage(None, written = true, TimestampMilli.min)) + private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None, PeerStorage(None, written = true, TimestampMilli.min)) case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe) @@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike import f._ val peer = TestProbe() - peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty))) + peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, PeerStorage(None, written = true, TimestampMilli.min)))) monitor.expectNoMessage() } From e97253aeb206af0b7ee869ae449334cfc96b4f88 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Thu, 21 Nov 2024 16:04:27 +0100 Subject: [PATCH 02/13] comments --- docs/release-notes/eclair-vnext.md | 5 ++++ eclair-core/src/main/resources/reference.conf | 4 +++ .../scala/fr/acinq/eclair/NodeParams.scala | 4 +-- .../scala/fr/acinq/eclair/db/PeersDb.scala | 2 ++ .../fr/acinq/eclair/db/pg/PgPeersDb.scala | 13 ++++---- .../eclair/db/sqlite/SqlitePeersDb.scala | 14 +++++---- .../main/scala/fr/acinq/eclair/io/Peer.scala | 30 ++++++++++--------- .../protocol/LightningMessageCodecs.scala | 7 ++--- .../scala/fr/acinq/eclair/TestConstants.scala | 4 +-- .../scala/fr/acinq/eclair/io/PeerSpec.scala | 7 ++--- .../eclair/io/ReconnectionTaskSpec.scala | 6 ++-- .../protocol/LightningMessageCodecsSpec.scala | 13 ++++++++ 12 files changed, 67 insertions(+), 42 deletions(-) diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index 5562da1032..9597960bed 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -6,6 +6,11 @@ +### Peer storage + +When `option_provide_storage` is enabled, eclair will store a small for our peers. +This is mostly intended for LSPs that serve mobile wallets to allow the users to restore their channel when they switch phones. + ### API changes diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 77dd6f058c..9df2190077 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -73,6 +73,8 @@ eclair { option_dual_fund = optional option_quiesce = optional option_onion_messages = optional + // Enable this if you serve mobile wallets. + option_provide_storage = disabled option_channel_type = optional option_scid_alias = optional option_payment_metadata = optional @@ -596,6 +598,8 @@ eclair { enabled = true // enable automatic purges of expired invoices from the database interval = 24 hours // interval between expired invoice purges } + + peer-storage-write-delay = 1 minute } akka { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 6f4278aa96..bc55100b9f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -93,7 +93,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, willFundRates_opt: Option[LiquidityAds.WillFundRates], peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig, onTheFlyFundingConfig: OnTheFlyFunding.Config, - peerStorageWriteDelayMax: FiniteDuration) { + peerStorageWriteDelay: FiniteDuration) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -681,7 +681,7 @@ object NodeParams extends Logging { onTheFlyFundingConfig = OnTheFlyFunding.Config( proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS), ), - peerStorageWriteDelayMax = 1 minute, + peerStorageWriteDelay = FiniteDuration(config.getDuration("peer-storage-write-delay").getSeconds, TimeUnit.SECONDS), ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala index b1824071d0..b33079e0f5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala @@ -35,8 +35,10 @@ trait PeersDb { def getRelayFees(nodeId: PublicKey): Option[RelayFees] + // Used only when option_provide_storage is enabled. def updateStorage(nodeId: PublicKey, data: ByteVector): Unit + // Used only when option_provide_storage is enabled. def getStorage(nodeId: PublicKey): Option[ByteVector] } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala index 76102f9482..a166143e16 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg import fr.acinq.bitcoin.scalacompat.Crypto import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.{MilliSatoshi, TimestampSecond} import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.PeersDb @@ -55,16 +55,16 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg } def migration34(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") + statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL)") } using(pg.createStatement()) { statement => getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") - statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, storage BYTEA)") + statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)") - statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") + statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { @@ -172,13 +172,14 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg withLock { pg => using(pg.prepareStatement( """ - INSERT INTO local.peer_storage (node_id, data) - VALUES (?, ?) + INSERT INTO local.peer_storage (node_id, data, last_updated_at) + VALUES (?, ?, ?) ON CONFLICT (node_id) DO UPDATE SET data = EXCLUDED.data """)) { statement => statement.setString(1, nodeId.value.toHex) statement.setBytes(2, data.toArray) + statement.setTimestamp(3, TimestampSecond.now().toSqlTimestamp) statement.executeUpdate() } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala index ba99c46ad1..c41a0710c1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.sqlite import fr.acinq.bitcoin.scalacompat.Crypto import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.{MilliSatoshi, TimestampSecond} import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.PeersDb @@ -47,14 +47,14 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { } def migration23(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL)") } getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)") - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL)") case Some(v@(1 | 2)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { @@ -140,13 +140,15 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { } override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Sqlite) { - using(sqlite.prepareStatement("UPDATE peer_storage SET data = ? WHERE node_id = ?")) { update => + using(sqlite.prepareStatement("UPDATE peer_storage SET data = ?, last_updated_at = ? WHERE node_id = ?")) { update => update.setBytes(1, data.toArray) - update.setBytes(2, nodeId.value.toArray) + update.setLong(2, TimestampSecond.now().toLong) + update.setBytes(3, nodeId.value.toArray) if (update.executeUpdate() == 0) { - using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?)")) { statement => + using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?, ?)")) { statement => statement.setBytes(1, nodeId.value.toArray) statement.setBytes(2, data.toArray) + statement.setLong(3, TimestampSecond.now().toLong) statement.executeUpdate() } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index aa6a433db6..0f5dd22020 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -86,7 +86,12 @@ class Peer(val nodeParams: NodeParams, FinalChannelId(state.channelId) -> channel }.toMap context.system.eventStream.publish(PeerCreated(self, remoteNodeId)) - goto(DISCONNECTED) using DisconnectedData(channels, PeerStorage(nodeParams.db.peers.getStorage(remoteNodeId), written = true, TimestampMilli.min)) // when we restart, we will attempt to reconnect right away, but then we'll wait + val peerStorageData = if (nodeParams.features.hasFeature(Features.ProvideStorage)) { + nodeParams.db.peers.getStorage(remoteNodeId) + } else { + None + } + goto(DISCONNECTED) using DisconnectedData(channels, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait } when(DISCONNECTED) { @@ -135,6 +140,10 @@ class Peer(val nodeParams: NodeParams, case Event(_: SpawnChannelNonInitiator, _) => stay() // we got disconnected before creating the channel actor case Event(_: LightningMessage, _) => stay() // we probably just got disconnected and that's the last messages we received + + case Event(WritePeerStorage, d: DisconnectedData) => + d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) + stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) } when(CONNECTED) { @@ -520,19 +529,12 @@ class Peer(val nodeParams: NodeParams, stay() case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty => - val timeSinceLastWrite = TimestampMilli.now() - d.peerStorage.lastWrite - val peerStorage = if (timeSinceLastWrite >= nodeParams.peerStorageWriteDelayMax) { - nodeParams.db.peers.updateStorage(remoteNodeId, store.blob) - PeerStorage(Some(store.blob), written = true, TimestampMilli.now()) - } else { - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageWriteDelayMax - timeSinceLastWrite) - PeerStorage(Some(store.blob), written = false, d.peerStorage.lastWrite) - } - stay() using d.copy(peerStorage = peerStorage) + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageWriteDelay) + stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) case Event(WritePeerStorage, d: ConnectedData) => d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) - stay() using d.copy(peerStorage = PeerStorage(d.peerStorage.data, written = true, TimestampMilli.now())) + stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) case Event(unhandledMsg: LightningMessage, _) => log.warning("ignoring message {}", unhandledMsg) @@ -788,7 +790,7 @@ class Peer(val nodeParams: NodeParams, if (Features.canUseFeature(connectionReady.localInit.features, connectionReady.remoteInit.features, Features.FundingFeeCredit)) { if (feeCredit.isEmpty) { // We read the fee credit from the database on the first connection attempt. - // We keep track of the latest credit afterwards and don't need to read it from the DB at every reconnection. + // We keep track of the latest credit afterwards and don't need to read it from the DB at every reconnection. feeCredit = Some(nodeParams.db.liquidity.getFeeCredit(remoteNodeId)) } log.info("reconnecting with fee credit = {}", feeCredit) @@ -930,7 +932,7 @@ object Peer { case class TemporaryChannelId(id: ByteVector32) extends ChannelId case class FinalChannelId(id: ByteVector32) extends ChannelId - case class PeerStorage(data: Option[ByteVector], written: Boolean, lastWrite: TimestampMilli) + case class PeerStorage(data: Option[ByteVector], written: Boolean) sealed trait Data { def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef] @@ -938,7 +940,7 @@ object Peer { } case object Nothing extends Data { override def channels = Map.empty - override def peerStorage: PeerStorage = PeerStorage(None, written = true, TimestampMilli.min) + override def peerStorage: PeerStorage = PeerStorage(None, written = true) } case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], peerStorage: PeerStorage) extends Data case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala index beb8f8d3b3..77e2d4dcec 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala @@ -389,15 +389,12 @@ object LightningMessageCodecs { ("onionPacket" | MessageOnionCodecs.messageOnionPacketCodec) :: ("tlvStream" | OnionMessageTlv.onionMessageTlvCodec)).as[OnionMessage] - private def isAcceptableBlobLength(length: Int) = - if (length <= 65531) Attempt.Successful(length) else Attempt.failure(Err(s"length $length is larger than 65531")) - val peerStorageStore: Codec[PeerStorageStore] = ( - ("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) :: + ("blob" | variableSizeBytes(uint16, bytes)) :: ("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageStore] val peerStorageRetrieval: Codec[PeerStorageRetrieval] = ( - ("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) :: + ("blob" | variableSizeBytes(uint16, bytes)) :: ("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageRetrieval] // NB: blank lines to minimize merge conflicts diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index ab8691a34e..8fdcb67b95 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -241,7 +241,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageWriteDelayMax = 5 seconds, + peerStorageWriteDelay = 5 seconds, ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -418,7 +418,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageWriteDelayMax = 5 seconds, + peerStorageWriteDelay = 5 seconds, ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 097dd26dbe..5709386a64 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -764,16 +764,15 @@ class PeerSpec extends FixtureSpec { nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef") connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef")) - peerConnection1.send(peer, PeerStorageStore(hex"c0ffee")) peerConnection1.send(peer, PeerStorageStore(hex"0123456789")) Thread.sleep(1000) peer ! Peer.Disconnect(f.remoteNodeId) connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"0123456789")) peerConnection2.send(peer, PeerStorageStore(hex"1111")) connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"1111")) - assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"c0ffee")) // Only the first update was written because of the rate limit. - Thread.sleep(5_000) - assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"abcdef")) // Because of the delayed writes, the original value hasn't been updated yet. + Thread.sleep(5000) + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) // Now it is updated. } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index 3a2498cebe..3ec60d9c7a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw) private val PeerNothingData = Peer.Nothing - private val PeerDisconnectedData = Peer.DisconnectedData(channels, PeerStorage(None, written = true, TimestampMilli.min)) - private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None, PeerStorage(None, written = true, TimestampMilli.min)) + private val PeerDisconnectedData = Peer.DisconnectedData(channels, PeerStorage(None, written = true)) + private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None, PeerStorage(None, written = true)) case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe) @@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike import f._ val peer = TestProbe() - peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, PeerStorage(None, written = true, TimestampMilli.min)))) + peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, PeerStorage(None, written = true)))) monitor.expectNoMessage() } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala index 5f7dbc939d..02cdcbe347 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala @@ -777,4 +777,17 @@ class LightningMessageCodecsSpec extends AnyFunSuite { assert(updateAddHtlcCodec.encode(decoded.value).require.bytes == bin) } } + + test("encode/decode peer storage messages") { + val testCases = Seq( + hex"0007 0003 012345" -> PeerStorageStore(hex"012345"), + hex"0009 0002 abcd" -> PeerStorageRetrieval(hex"abcd"), + ) + for ((bin, ref) <- testCases) { + val decoded = lightningMessageCodec.decode(bin.bits).require + assert(decoded.value == ref) + assert(decoded.remainder.isEmpty) + assert(lightningMessageCodec.encode(ref).require.bytes == bin) + } + } } From 5cc8494ab14d9091f01739c6ba081a4655b61858 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Fri, 22 Nov 2024 13:18:04 +0100 Subject: [PATCH 03/13] keep peer storage for a while after channels are closed --- eclair-core/src/main/resources/reference.conf | 7 +++- .../scala/fr/acinq/eclair/NodeParams.scala | 6 ++- .../main/scala/fr/acinq/eclair/Setup.scala | 5 ++- .../fr/acinq/eclair/db/DualDatabases.scala | 7 +++- .../acinq/eclair/db/PeerStorageCleaner.scala | 40 +++++++++++++++++++ .../scala/fr/acinq/eclair/db/PeersDb.scala | 3 ++ .../fr/acinq/eclair/db/pg/PgPeersDb.scala | 27 +++++++++---- .../eclair/db/sqlite/SqlitePeersDb.scala | 23 +++++++++-- .../main/scala/fr/acinq/eclair/io/Peer.scala | 13 +++--- .../scala/fr/acinq/eclair/TestConstants.scala | 2 + 10 files changed, 112 insertions(+), 21 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 9df2190077..5fcb359fbc 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -599,7 +599,12 @@ eclair { interval = 24 hours // interval between expired invoice purges } - peer-storage-write-delay = 1 minute + peer-storage { + // Peer storage is persisted only after this delay to reduce the number of writes when updating it multiple times in a row. + write-delay = 1 minute + // Peer storage is kept this long after the last channel has been closed. + removal-delay = 2 months + } } akka { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index bc55100b9f..28e7165b19 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -93,7 +93,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, willFundRates_opt: Option[LiquidityAds.WillFundRates], peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig, onTheFlyFundingConfig: OnTheFlyFunding.Config, - peerStorageWriteDelay: FiniteDuration) { + peerStorageWriteDelay: FiniteDuration, + peerStorageRemovalDelay: FiniteDuration) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -681,7 +682,8 @@ object NodeParams extends Logging { onTheFlyFundingConfig = OnTheFlyFunding.Config( proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS), ), - peerStorageWriteDelay = FiniteDuration(config.getDuration("peer-storage-write-delay").getSeconds, TimeUnit.SECONDS), + peerStorageWriteDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS), + peerStorageRemovalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 73b2c0f358..03c605c8e7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -37,7 +37,7 @@ import fr.acinq.eclair.crypto.WeakEntropyPool import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager, LocalOnChainKeyManager} import fr.acinq.eclair.db.Databases.FileBackup import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams -import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler} +import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler, PeerStorageCleaner} import fr.acinq.eclair.io._ import fr.acinq.eclair.message.Postman import fr.acinq.eclair.payment.offer.OfferManager @@ -356,6 +356,9 @@ class Setup(val datadir: File, logger.warn("database backup is disabled") system.deadLetters } + _ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) { + system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageRemovalDelay)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") + } dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume)) register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume)) offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 05219d7eec..7fb7e56eba 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -13,7 +13,7 @@ import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} -import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond} import grizzled.slf4j.Logging import scodec.bits.ByteVector @@ -303,6 +303,11 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb { runAsync(secondary.getStorage(nodeId)) primary.getStorage(nodeId) } + + override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = { + runAsync(secondary.removePeerStorage(peerRemovedBefore)) + primary.removePeerStorage(peerRemovedBefore) + } } case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends PaymentsDb { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala new file mode 100644 index 0000000000..bc2df32285 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import fr.acinq.eclair.TimestampSecond + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +object PeerStorageCleaner { + sealed trait Command + private case object CleanPeerStorage extends Command + + def apply(db: PeersDb, removalDelay: FiniteDuration): Behavior[Command] = { + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(CleanPeerStorage, 1 day) + Behaviors.receiveMessage { + case CleanPeerStorage => + db.removePeerStorage(TimestampSecond.now() - removalDelay) + Behaviors.same + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala index b33079e0f5..82ba3933a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala @@ -17,6 +17,7 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.eclair.TimestampSecond import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.wire.protocol.NodeAddress import scodec.bits.ByteVector @@ -41,4 +42,6 @@ trait PeersDb { // Used only when option_provide_storage is enabled. def getStorage(nodeId: PublicKey): Option[ByteVector] + // Reclaim storage from peers that have had no active channel with us for a while. + def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala index a166143e16..95a1748c9c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala @@ -55,7 +55,8 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg } def migration34(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL)") + statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)") + statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)") } using(pg.createStatement()) { statement => @@ -64,7 +65,9 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)") statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)") - statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL)") + statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)") + + statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { @@ -106,8 +109,18 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg statement.setString(1, nodeId.value.toHex) statement.executeUpdate() } - using(pg.prepareStatement("DELETE FROM local.peer_storage WHERE node_id = ?")) { statement => - statement.setString(1, nodeId.value.toHex) + using(pg.prepareStatement("UPDATE local.peer_storage SET removed_peer_at = ? WHERE node_id = ?")) { statement => + statement.setTimestamp(1, TimestampSecond.now().toSqlTimestamp) + statement.setString(2, nodeId.value.toHex) + statement.executeUpdate() + } + } + } + + override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Postgres) { + withLock { pg => + using(pg.prepareStatement("DELETE FROM local.peer_storage WHERE removed_peer_at < ?")) { statement => + statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp) statement.executeUpdate() } } @@ -172,10 +185,10 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg withLock { pg => using(pg.prepareStatement( """ - INSERT INTO local.peer_storage (node_id, data, last_updated_at) - VALUES (?, ?, ?) + INSERT INTO local.peer_storage (node_id, data, last_updated_at, removed_peer_at) + VALUES (?, ?, ?, NULL) ON CONFLICT (node_id) - DO UPDATE SET data = EXCLUDED.data + DO UPDATE SET data = EXCLUDED.data, last_updated_at = EXCLUDED.last_updated_at, removed_peer_at = NULL """)) { statement => statement.setString(1, nodeId.value.toHex) statement.setBytes(2, data.toArray) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala index c41a0710c1..4573f4d379 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala @@ -47,14 +47,17 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { } def migration23(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") + statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)") } getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)") - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") + + statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)") case Some(v@(1 | 2)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { @@ -89,6 +92,18 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { statement.setBytes(1, nodeId.value.toArray) statement.executeUpdate() } + using(sqlite.prepareStatement("UPDATE peer_storage SET removed_peer_at = ? WHERE node_id = ?")) { statement => + statement.setLong(1, TimestampSecond.now().toLong) + statement.setBytes(2, nodeId.value.toArray) + statement.executeUpdate() + } + } + + override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Sqlite) { + using(sqlite.prepareStatement("DELETE FROM peer_storage WHERE removed_peer_at < ?")) { statement => + statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp) + statement.executeUpdate() + } } override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Sqlite) { @@ -140,12 +155,12 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { } override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Sqlite) { - using(sqlite.prepareStatement("UPDATE peer_storage SET data = ?, last_updated_at = ? WHERE node_id = ?")) { update => + using(sqlite.prepareStatement("UPDATE peer_storage SET data = ?, last_updated_at = ?, removed_peer_at = NULL WHERE node_id = ?")) { update => update.setBytes(1, data.toArray) update.setLong(2, TimestampSecond.now().toLong) update.setBytes(3, nodeId.value.toArray) if (update.executeUpdate() == 0) { - using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?, ?)")) { statement => + using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?, ?, NULL)")) { statement => statement.setBytes(1, nodeId.value.toArray) statement.setBytes(2, data.toArray) statement.setLong(3, TimestampSecond.now().toLong) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index 0f5dd22020..cf7b1048d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -111,7 +111,7 @@ class Peer(val nodeParams: NodeParams, log.info("that was the last open channel") context.system.eventStream.publish(LastChannelClosed(self, remoteNodeId)) // We have no existing channels or pending signed transaction, we can forget about this peer. - stopPeer() + stopPeer(d.peerStorage) } else { stay() using d.copy(channels = channels1) } @@ -122,7 +122,7 @@ class Peer(val nodeParams: NodeParams, } if (d.channels.isEmpty && canForgetPendingOnTheFlyFunding()) { // We have no existing channels or pending signed transaction, we can forget about this peer. - stopPeer() + stopPeer(d.peerStorage) } else { stay() } @@ -473,7 +473,7 @@ class Peer(val nodeParams: NodeParams, } if (d.channels.isEmpty && canForgetPendingOnTheFlyFunding()) { // We have no existing channels or pending signed transaction, we can forget about this peer. - stopPeer() + stopPeer(d.peerStorage) } else { d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.peerStorage) @@ -616,7 +616,7 @@ class Peer(val nodeParams: NodeParams, } pendingOnTheFlyFunding = pendingOnTheFlyFunding.removedAll(expired.keys) d match { - case d: DisconnectedData if d.channels.isEmpty && pendingOnTheFlyFunding.isEmpty => stopPeer() + case d: DisconnectedData if d.channels.isEmpty && pendingOnTheFlyFunding.isEmpty => stopPeer(d.peerStorage) case _ => stay() } @@ -870,7 +870,10 @@ class Peer(val nodeParams: NodeParams, // resume the openChannelInterceptor in case of failure, we always want the open channel request to succeed or fail private val openChannelInterceptor = context.spawnAnonymous(Behaviors.supervise(OpenChannelInterceptor(context.self.toTyped, nodeParams, remoteNodeId, wallet, pendingChannelsRateLimiter)).onFailure(typed.SupervisorStrategy.resume)) - private def stopPeer(): State = { + private def stopPeer(peerStorage: PeerStorage): State = { + if (!peerStorage.written) { + peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) + } log.info("removing peer from db") cancelUnsignedOnTheFlyFunding() nodeParams.db.peers.removePeer(remoteNodeId) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 8fdcb67b95..bb7de459d2 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -242,6 +242,7 @@ object TestConstants { peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), peerStorageWriteDelay = 5 seconds, + peerStorageRemovalDelay = 10 seconds, ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -419,6 +420,7 @@ object TestConstants { peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), peerStorageWriteDelay = 5 seconds, + peerStorageRemovalDelay = 10 seconds, ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( From 2de184e6943b46450b5d97a9d6907accde453731 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Fri, 22 Nov 2024 13:34:03 +0100 Subject: [PATCH 04/13] days --- eclair-core/src/main/resources/reference.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 5fcb359fbc..79ae0a35ae 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -603,7 +603,7 @@ eclair { // Peer storage is persisted only after this delay to reduce the number of writes when updating it multiple times in a row. write-delay = 1 minute // Peer storage is kept this long after the last channel has been closed. - removal-delay = 2 months + removal-delay = 30 days } } From 15159c3c05ba0fb09d203d34d2651ac31368c875 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Fri, 22 Nov 2024 14:04:36 +0100 Subject: [PATCH 05/13] flaky test --- .../src/test/scala/fr/acinq/eclair/io/PeerSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 5709386a64..5083244588 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -765,14 +765,14 @@ class PeerSpec extends FixtureSpec { nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef") connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef")) peerConnection1.send(peer, PeerStorageStore(hex"0123456789")) - Thread.sleep(1000) peer ! Peer.Disconnect(f.remoteNodeId) connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"0123456789")) peerConnection2.send(peer, PeerStorageStore(hex"1111")) connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"1111")) assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"abcdef")) // Because of the delayed writes, the original value hasn't been updated yet. - Thread.sleep(5000) - assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) // Now it is updated. + eventually { + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) // Now it is updated. + } } } From 63d5fc4ab42a6d9f24c380a7c4678572a1b2a552 Mon Sep 17 00:00:00 2001 From: t-bast Date: Fri, 22 Nov 2024 17:04:30 +0100 Subject: [PATCH 06/13] nits --- docs/release-notes/eclair-vnext.md | 6 ++++-- eclair-core/src/main/resources/reference.conf | 9 +++++++-- .../main/scala/fr/acinq/eclair/NodeParams.scala | 15 +++++++++++---- .../src/main/scala/fr/acinq/eclair/Setup.scala | 2 +- .../main/scala/fr/acinq/eclair/db/PeersDb.scala | 7 ++++--- .../src/main/scala/fr/acinq/eclair/io/Peer.scala | 8 +++++++- .../wire/protocol/LightningMessageTypes.scala | 4 ++-- .../eclair/wire/protocol/PeerStorageTlv.scala | 2 +- .../scala/fr/acinq/eclair/TestConstants.scala | 6 ++---- .../test/scala/fr/acinq/eclair/io/PeerSpec.scala | 13 +++++++------ 10 files changed, 46 insertions(+), 26 deletions(-) diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index 9597960bed..f4236fe517 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -8,8 +8,10 @@ ### Peer storage -When `option_provide_storage` is enabled, eclair will store a small for our peers. -This is mostly intended for LSPs that serve mobile wallets to allow the users to restore their channel when they switch phones. +With this release, eclair supports the `option_provide_storage` feature introduced in . +When `option_provide_storage` is enabled, eclair will store a small encrypted backup for peers that request it. +This backup is limited to 65kB and node operators should customize the `eclair.peer-storage` configuration section to match their desired SLAs. +This is mostly intended for LSPs that serve mobile wallets to allow users to restore their channels when they switch phones. ### API changes diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 79ae0a35ae..6825a82c97 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -73,7 +73,8 @@ eclair { option_dual_fund = optional option_quiesce = optional option_onion_messages = optional - // Enable this if you serve mobile wallets. + // This feature should only be enabled when acting as an LSP for mobile wallets. + // When activating this feature, the peer-storage section should be customized to match desired SLAs. option_provide_storage = disabled option_channel_type = optional option_scid_alias = optional @@ -601,8 +602,12 @@ eclair { peer-storage { // Peer storage is persisted only after this delay to reduce the number of writes when updating it multiple times in a row. + // A small delay may result in a lot of IO write operations, which can have a negative performance impact on the node. + // But using a large delay increases the risk of not storing the latest peer data if you restart your node while writes are pending. write-delay = 1 minute - // Peer storage is kept this long after the last channel has been closed. + // Peer storage is kept this long after the last channel with that peer has been closed. + // A long delay here guarantees that peers who are offline while their channels are closed will be able to get their funds + // back if they restore from seed on a different device after the channels have been closed. removal-delay = 30 days } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 28e7165b19..a71457ad64 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -93,8 +93,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, willFundRates_opt: Option[LiquidityAds.WillFundRates], peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig, onTheFlyFundingConfig: OnTheFlyFunding.Config, - peerStorageWriteDelay: FiniteDuration, - peerStorageRemovalDelay: FiniteDuration) { + peerStorageConfig: PeerStorageConfig) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -158,6 +157,12 @@ case class PaymentFinalExpiryConf(min: CltvExpiryDelta, max: CltvExpiryDelta) { } } +/** + * @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates. + * @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration. + */ +case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration) + object NodeParams extends Logging { /** @@ -682,8 +687,10 @@ object NodeParams extends Logging { onTheFlyFundingConfig = OnTheFlyFunding.Config( proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS), ), - peerStorageWriteDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS), - peerStorageRemovalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), + peerStorageConfig = PeerStorageConfig( + writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS), + removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), + ) ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 03c605c8e7..362635309c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -357,7 +357,7 @@ class Setup(val datadir: File, system.deadLetters } _ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) { - system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageRemovalDelay)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") + system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig.removalDelay)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") } dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume)) register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala index 82ba3933a3..2515accf1e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala @@ -36,12 +36,13 @@ trait PeersDb { def getRelayFees(nodeId: PublicKey): Option[RelayFees] - // Used only when option_provide_storage is enabled. + /** Update our peer's blob data when [[fr.acinq.eclair.Features.ProvideStorage]] is enabled. */ def updateStorage(nodeId: PublicKey, data: ByteVector): Unit - // Used only when option_provide_storage is enabled. + /** Get the last blob of data we stored for that peer, if [[fr.acinq.eclair.Features.ProvideStorage]] is enabled. */ def getStorage(nodeId: PublicKey): Option[ByteVector] - // Reclaim storage from peers that have had no active channel with us for a while. + /** Remove storage from peers that have had no active channel with us for a while. */ def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit + } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index cf7b1048d8..da11c32226 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -529,7 +529,13 @@ class Peer(val nodeParams: NodeParams, stay() case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty => - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageWriteDelay) + // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. + // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. + // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay + // writing to the DB and may never store our peer's backup. + if (d.peerStorage.written) { + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + } stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) case Event(WritePeerStorage, d: ConnectedData) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala index de26d328b7..0bb66c36d1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala @@ -602,9 +602,9 @@ case class GossipTimestampFilter(chainHash: BlockHash, firstTimestamp: Timestamp case class OnionMessage(pathKey: PublicKey, onionRoutingPacket: OnionRoutingPacket, tlvStream: TlvStream[OnionMessageTlv] = TlvStream.empty) extends LightningMessage -case class PeerStorageStore(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage +case class PeerStorageStore(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends SetupMessage -case class PeerStorageRetrieval(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage +case class PeerStorageRetrieval(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends SetupMessage // NB: blank lines to minimize merge conflicts diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala index 4ebb3fec39..e4701c7868 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/PeerStorageTlv.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 ACINQ SAS + * Copyright 2024 ACINQ SAS * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index bb7de459d2..7baff3b2aa 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -241,8 +241,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageWriteDelay = 5 seconds, - peerStorageRemovalDelay = 10 seconds, + peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -419,8 +418,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageWriteDelay = 5 seconds, - peerStorageRemovalDelay = 10 seconds, + peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 5083244588..57214da1b7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -108,9 +108,9 @@ class PeerSpec extends FixtureSpec { def cleanupFixture(fixture: FixtureParam): Unit = fixture.cleanup() - def connect(remoteNodeId: PublicKey, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, switchboard: TestProbe, channels: Set[PersistentChannelData] = Set.empty, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), sendInit: Boolean = true, peerStorage: Option[ByteVector] = None)(implicit system: ActorSystem): Unit = { + def connect(remoteNodeId: PublicKey, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, switchboard: TestProbe, channels: Set[PersistentChannelData] = Set.empty, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), initializePeer: Boolean = true, peerStorage: Option[ByteVector] = None)(implicit system: ActorSystem): Unit = { // let's simulate a connection - if (sendInit) { + if (initializePeer) { switchboard.send(peer, Peer.Init(channels, Map.empty)) } val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures()) @@ -764,14 +764,15 @@ class PeerSpec extends FixtureSpec { nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef") connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef")) + peerConnection1.send(peer, PeerStorageStore(hex"deadbeef")) peerConnection1.send(peer, PeerStorageStore(hex"0123456789")) peer ! Peer.Disconnect(f.remoteNodeId) - connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"0123456789")) + connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789")) peerConnection2.send(peer, PeerStorageStore(hex"1111")) - connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), sendInit = false, peerStorage = Some(hex"1111")) - assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"abcdef")) // Because of the delayed writes, the original value hasn't been updated yet. + connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111")) + // Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it. eventually { - assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) // Now it is updated. + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) } } From 2335100f8de7388feb47669377ecbfeef382b6c4 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Mon, 25 Nov 2024 11:14:19 +0100 Subject: [PATCH 07/13] Count active channels --- .../fr/acinq/eclair/channel/fsm/Channel.scala | 7 ++++ .../main/scala/fr/acinq/eclair/io/Peer.scala | 36 ++++++++++++++----- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index 06a9d83ea9..1e8d8ce981 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -2588,6 +2588,13 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case _: TransientChannelData => None } context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) + + if (state == NORMAL) { + peer ! Peer.ChannelDeactivated + } + if (nextState == NORMAL) { + peer ! Peer.ChannelActivated + } } if (nextState == CLOSED) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index da11c32226..304509fae0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -91,7 +91,7 @@ class Peer(val nodeParams: NodeParams, } else { None } - goto(DISCONNECTED) using DisconnectedData(channels, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait + goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = 0, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait } when(DISCONNECTED) { @@ -100,7 +100,7 @@ class Peer(val nodeParams: NodeParams, stay() case Event(connectionReady: PeerConnection.ConnectionReady, d: DisconnectedData) => - gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.peerStorage) + gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage) case Event(Terminated(actor), d: DisconnectedData) if d.channels.values.toSet.contains(actor) => // we have at most 2 ids: a TemporaryChannelId and a FinalChannelId @@ -144,6 +144,12 @@ class Peer(val nodeParams: NodeParams, case Event(WritePeerStorage, d: DisconnectedData) => d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) + + case Event(ChannelActivated, d: DisconnectedData) => + stay() using d.copy(activeChannels = d.activeChannels + 1) + + case Event(ChannelDeactivated, d: DisconnectedData) => + stay() using d.copy(activeChannels = d.activeChannels - 1) } when(CONNECTED) { @@ -476,7 +482,7 @@ class Peer(val nodeParams: NodeParams, stopPeer(d.peerStorage) } else { d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) - goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.peerStorage) + goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage) } case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) => @@ -495,7 +501,7 @@ class Peer(val nodeParams: NodeParams, log.debug(s"got new connection, killing current one and switching") d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced) d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) - gotoConnected(connectionReady, d.channels, d.peerStorage) + gotoConnected(connectionReady, d.channels, d.activeChannels, d.peerStorage) case Event(msg: OnionMessage, _: ConnectedData) => OnionMessages.process(nodeParams.privateKey, msg) match { @@ -528,7 +534,7 @@ class Peer(val nodeParams: NodeParams, d.peerConnection forward unknownMsg stay() - case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty => + case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels > 0 => // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay @@ -542,6 +548,12 @@ class Peer(val nodeParams: NodeParams, d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) + case Event(ChannelActivated, d: ConnectedData) => + stay() using d.copy(activeChannels = d.activeChannels + 1) + + case Event(ChannelDeactivated, d: ConnectedData) => + stay() using d.copy(activeChannels = d.activeChannels - 1) + case Event(unhandledMsg: LightningMessage, _) => log.warning("ignoring message {}", unhandledMsg) stay() @@ -773,7 +785,7 @@ class Peer(val nodeParams: NodeParams, context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) } - private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], peerStorage: PeerStorage): State = { + private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage): State = { require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}") log.debug("got authenticated connection to address {}", connectionReady.address) @@ -803,7 +815,7 @@ class Peer(val nodeParams: NodeParams, connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat)) } - goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None, peerStorage) + goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage) } /** @@ -945,14 +957,16 @@ object Peer { sealed trait Data { def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef] + def activeChannels: Int def peerStorage: PeerStorage } case object Nothing extends Data { override def channels = Map.empty + override def activeChannels: Int = 0 override def peerStorage: PeerStorage = PeerStorage(None, written = true) } - case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], peerStorage: PeerStorage) extends Data - case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { + case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage) extends Data + case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Int, currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit) def localFeatures: Features[InitFeature] = localInit.features def remoteFeatures: Features[InitFeature] = remoteInit.features @@ -1067,5 +1081,9 @@ object Peer { case class RelayUnknownMessage(unknownMessage: UnknownMessage) case object WritePeerStorage + + case object ChannelActivated + + case object ChannelDeactivated // @formatter:on } From 3f16d4a37289f2f02f30c477161658bf08a15bd9 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Mon, 25 Nov 2024 11:37:04 +0100 Subject: [PATCH 08/13] Fix tests --- .../src/test/scala/fr/acinq/eclair/io/PeerSpec.scala | 1 + .../scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 57214da1b7..6d72d44360 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -112,6 +112,7 @@ class PeerSpec extends FixtureSpec { // let's simulate a connection if (initializePeer) { switchboard.send(peer, Peer.Init(channels, Map.empty)) + channels.foreach(c => if (c.isInstanceOf[DATA_NORMAL]) peer ! ChannelActivated) } val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures()) switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index 3ec60d9c7a..e29ea45964 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw) private val PeerNothingData = Peer.Nothing - private val PeerDisconnectedData = Peer.DisconnectedData(channels, PeerStorage(None, written = true)) - private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None, PeerStorage(None, written = true)) + private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = 0, PeerStorage(None, written = true)) + private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = 0, recommendedFeerates, None, PeerStorage(None, written = true)) case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe) @@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike import f._ val peer = TestProbe() - peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, PeerStorage(None, written = true)))) + peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = 0, PeerStorage(None, written = true)))) monitor.expectNoMessage() } From 797ec8e1cc2253261d0ad8263ccbccade362ddb1 Mon Sep 17 00:00:00 2001 From: t-bast Date: Mon, 25 Nov 2024 16:24:07 +0100 Subject: [PATCH 09/13] Fix SqlitePeersDb storage removal The confusion between SQL timestamps and long resulted in always removing storage regardless of the `peerRemovedBefore` parameter. --- .../eclair/db/sqlite/SqlitePeersDb.scala | 6 +++--- .../fr/acinq/eclair/db/PeersDbSpec.scala | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala index 4573f4d379..26971273c5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala @@ -47,7 +47,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { } def migration23(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)") } @@ -55,7 +55,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { case None => statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)") - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)") case Some(v@(1 | 2)) => @@ -101,7 +101,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM peer_storage WHERE removed_peer_at < ?")) { statement => - statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp) + statement.setLong(1, peerRemovedBefore.toLong) statement.executeUpdate() } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala index 76f4719014..cf687634e9 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala @@ -18,10 +18,10 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} +import fr.acinq.eclair._ import fr.acinq.eclair.db.pg.PgPeersDb import fr.acinq.eclair.db.sqlite.SqlitePeersDb import fr.acinq.eclair.payment.relay.Relayer.RelayFees -import fr.acinq.eclair._ import fr.acinq.eclair.wire.protocol.{NodeAddress, Tor2, Tor3} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.HexStringSyntax @@ -108,7 +108,7 @@ class PeersDbSpec extends AnyFunSuite { } } - test("peer storage") { + test("add/update/remove peer storage") { forAllDbs { dbs => val db = dbs.peers @@ -126,6 +126,21 @@ class PeersDbSpec extends AnyFunSuite { db.updateStorage(b, hex"abcd") assert(db.getStorage(a) == Some(hex"6789")) assert(db.getStorage(b) == Some(hex"abcd")) + + // Actively used storage shouldn't be removed. + db.removePeerStorage(TimestampSecond.now() + 1.hour) + assert(db.getStorage(a) == Some(hex"6789")) + assert(db.getStorage(b) == Some(hex"abcd")) + + // After removing the peer, peer storage can be removed. + db.removePeer(a) + assert(db.getStorage(a) == Some(hex"6789")) + db.removePeerStorage(TimestampSecond.now() - 1.hour) + assert(db.getStorage(a) == Some(hex"6789")) + db.removePeerStorage(TimestampSecond.now() + 1.hour) + assert(db.getStorage(a) == None) + assert(db.getStorage(b) == Some(hex"abcd")) } } + } From f708e10b1f8e511db67a5dee5126e7e6a74ad472 Mon Sep 17 00:00:00 2001 From: t-bast Date: Mon, 25 Nov 2024 16:33:36 +0100 Subject: [PATCH 10/13] Make frequency of storage DB clean-up configurable Added to the peer storage configuration section of `eclair.conf`. --- eclair-core/src/main/resources/reference.conf | 2 ++ .../scala/fr/acinq/eclair/NodeParams.scala | 8 +++--- .../main/scala/fr/acinq/eclair/Setup.scala | 2 +- .../acinq/eclair/db/PeerStorageCleaner.scala | 26 +++++++++++-------- .../scala/fr/acinq/eclair/TestConstants.scala | 4 +-- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 6825a82c97..bb912da344 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -609,6 +609,8 @@ eclair { // A long delay here guarantees that peers who are offline while their channels are closed will be able to get their funds // back if they restore from seed on a different device after the channels have been closed. removal-delay = 30 days + // Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore. + cleanup-frequency = 1 day } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index a71457ad64..9a0fbaf034 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -158,10 +158,11 @@ case class PaymentFinalExpiryConf(min: CltvExpiryDelta, max: CltvExpiryDelta) { } /** - * @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates. - * @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration. + * @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates. + * @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration. + * @param cleanUpFrequency frequency at which we go through the DB to remove unused storage. */ -case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration) +case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration, cleanUpFrequency: FiniteDuration) object NodeParams extends Logging { @@ -690,6 +691,7 @@ object NodeParams extends Logging { peerStorageConfig = PeerStorageConfig( writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS), removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), + cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS), ) ) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 362635309c..e27a8472dd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -357,7 +357,7 @@ class Setup(val datadir: File, system.deadLetters } _ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) { - system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig.removalDelay)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") + system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") } dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume)) register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala index bc2df32285..7153fd5cad 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala @@ -18,23 +18,27 @@ package fr.acinq.eclair.db import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors -import fr.acinq.eclair.TimestampSecond - -import scala.concurrent.duration.{DurationInt, FiniteDuration} +import fr.acinq.eclair.{PeerStorageConfig, TimestampSecond} +/** + * This actor frequently deletes from our DB peer storage from nodes with whom we don't have channels anymore, after a + * grace period. + */ object PeerStorageCleaner { + // @formatter:off sealed trait Command private case object CleanPeerStorage extends Command + // @formatter:on - def apply(db: PeersDb, removalDelay: FiniteDuration): Behavior[Command] = { - Behaviors.withTimers { timers => - timers.startTimerWithFixedDelay(CleanPeerStorage, 1 day) - Behaviors.receiveMessage { - case CleanPeerStorage => - db.removePeerStorage(TimestampSecond.now() - removalDelay) - Behaviors.same - } + def apply(db: PeersDb, config: PeerStorageConfig): Behavior[Command] = { + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(CleanPeerStorage, config.cleanUpFrequency) + Behaviors.receiveMessage { + case CleanPeerStorage => + db.removePeerStorage(TimestampSecond.now() - config.removalDelay) + Behaviors.same } } + } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 7baff3b2aa..31cf3e6ebf 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -241,7 +241,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds) + peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -418,7 +418,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds) + peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( From ca2c7f242bac99df6610454d9263fdfa34923f73 Mon Sep 17 00:00:00 2001 From: t-bast Date: Mon, 25 Nov 2024 17:23:12 +0100 Subject: [PATCH 11/13] Use existing channel events Instead of defining new events. We also keep a set of active channels to ensure that duplicate events don't mess up our state (even though this shouldn't happen, it feels safer). --- .../fr/acinq/eclair/channel/fsm/Channel.scala | 7 --- .../main/scala/fr/acinq/eclair/io/Peer.scala | 59 +++++++++---------- .../scala/fr/acinq/eclair/io/PeerSpec.scala | 21 +++++-- .../eclair/io/ReconnectionTaskSpec.scala | 6 +- 4 files changed, 48 insertions(+), 45 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index 1e8d8ce981..06a9d83ea9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -2588,13 +2588,6 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case _: TransientChannelData => None } context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) - - if (state == NORMAL) { - peer ! Peer.ChannelDeactivated - } - if (nextState == NORMAL) { - peer ! Peer.ChannelActivated - } } if (nextState == CLOSED) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index 304509fae0..e3c4a427bd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -74,6 +74,7 @@ class Peer(val nodeParams: NodeParams, context.system.eventStream.subscribe(self, classOf[CurrentFeerates]) context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight]) + context.system.eventStream.subscribe(self, classOf[LocalChannelDown]) startWith(INSTANTIATING, Nothing) @@ -91,7 +92,7 @@ class Peer(val nodeParams: NodeParams, } else { None } - goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = 0, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait + goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait } when(DISCONNECTED) { @@ -145,11 +146,11 @@ class Peer(val nodeParams: NodeParams, d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) - case Event(ChannelActivated, d: DisconnectedData) => - stay() using d.copy(activeChannels = d.activeChannels + 1) + case Event(e: ChannelReadyForPayments, d: DisconnectedData) => + stay() using d.copy(activeChannels = d.activeChannels + e.channelId) - case Event(ChannelDeactivated, d: DisconnectedData) => - stay() using d.copy(activeChannels = d.activeChannels - 1) + case Event(e: LocalChannelDown, d: DisconnectedData) => + stay() using d.copy(activeChannels = d.activeChannels - e.channelId) } when(CONNECTED) { @@ -424,7 +425,7 @@ class Peer(val nodeParams: NodeParams, } stay() - case Event(e: ChannelReadyForPayments, _: ConnectedData) => + case Event(e: ChannelReadyForPayments, d: ConnectedData) => pendingOnTheFlyFunding.foreach { case (paymentHash, pending) => pending.status match { @@ -440,7 +441,10 @@ class Peer(val nodeParams: NodeParams, } } } - stay() + stay() using d.copy(activeChannels = d.activeChannels + e.channelId) + + case Event(e: LocalChannelDown, d: ConnectedData) => + stay() using d.copy(activeChannels = d.activeChannels - e.channelId) case Event(msg: HasChannelId, d: ConnectedData) => d.channels.get(FinalChannelId(msg.channelId)) match { @@ -534,26 +538,25 @@ class Peer(val nodeParams: NodeParams, d.peerConnection forward unknownMsg stay() - case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels > 0 => - // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. - // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. - // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay - // writing to the DB and may never store our peer's backup. - if (d.peerStorage.written) { - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + case Event(store: PeerStorageStore, d: ConnectedData) => + if (nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels.nonEmpty) { + // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. + // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. + // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay + // writing to the DB and may never store our peer's backup. + if (d.peerStorage.written) { + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + } + stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) + } else { + log.debug("ignoring peer storage (feature={}, channels={})", nodeParams.features.hasFeature(Features.ProvideStorage), d.activeChannels.mkString(",")) + stay() } - stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) case Event(WritePeerStorage, d: ConnectedData) => d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) - case Event(ChannelActivated, d: ConnectedData) => - stay() using d.copy(activeChannels = d.activeChannels + 1) - - case Event(ChannelDeactivated, d: ConnectedData) => - stay() using d.copy(activeChannels = d.activeChannels - 1) - case Event(unhandledMsg: LightningMessage, _) => log.warning("ignoring message {}", unhandledMsg) stay() @@ -785,7 +788,7 @@ class Peer(val nodeParams: NodeParams, context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) } - private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage): State = { + private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage): State = { require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}") log.debug("got authenticated connection to address {}", connectionReady.address) @@ -957,16 +960,16 @@ object Peer { sealed trait Data { def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef] - def activeChannels: Int + def activeChannels: Set[ByteVector32] // channels that are available to process payments def peerStorage: PeerStorage } case object Nothing extends Data { override def channels = Map.empty - override def activeChannels: Int = 0 + override def activeChannels: Set[ByteVector32] = Set.empty override def peerStorage: PeerStorage = PeerStorage(None, written = true) } - case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage) extends Data - case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Int, currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { + case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data + case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit) def localFeatures: Features[InitFeature] = localInit.features def remoteFeatures: Features[InitFeature] = remoteInit.features @@ -1081,9 +1084,5 @@ object Peer { case class RelayUnknownMessage(unknownMessage: UnknownMessage) case object WritePeerStorage - - case object ChannelActivated - - case object ChannelDeactivated // @formatter:on } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 6d72d44360..c839341d36 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -112,7 +112,6 @@ class PeerSpec extends FixtureSpec { // let's simulate a connection if (initializePeer) { switchboard.send(peer, Peer.Init(channels, Map.empty)) - channels.foreach(c => if (c.isInstanceOf[DATA_NORMAL]) peer ! ChannelActivated) } val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures()) switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit)) @@ -759,22 +758,34 @@ class PeerSpec extends FixtureSpec { test("peer storage") { f => import f._ + // We connect with a previous backup. + val channel = ChannelCodecsSpec.normal val peerConnection1 = peerConnection - val peerConnection2 = TestProbe() - val peerConnection3 = TestProbe() - nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef") - connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef")) + connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(channel), peerStorage = Some(hex"abcdef")) + peer ! ChannelReadyForPayments(ActorRef.noSender, channel.remoteNodeId, channel.channelId, channel.commitments.latest.fundingTxIndex) peerConnection1.send(peer, PeerStorageStore(hex"deadbeef")) peerConnection1.send(peer, PeerStorageStore(hex"0123456789")) + + // We disconnect and reconnect, sending the last backup we received. peer ! Peer.Disconnect(f.remoteNodeId) + val peerConnection2 = TestProbe() connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789")) peerConnection2.send(peer, PeerStorageStore(hex"1111")) + + // We reconnect again. + val peerConnection3 = TestProbe() connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111")) // Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it. eventually { assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) } + + // Our channel closes, so we stop storing backups for that peer. + peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId) + peerConnection3.send(peer, PeerStorageStore(hex"2222")) + peer ! WritePeerStorage + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index e29ea45964..dba103b8ba 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw) private val PeerNothingData = Peer.Nothing - private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = 0, PeerStorage(None, written = true)) - private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = 0, recommendedFeerates, None, PeerStorage(None, written = true)) + private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true)) + private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true)) case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe) @@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike import f._ val peer = TestProbe() - peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = 0, PeerStorage(None, written = true)))) + peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true)))) monitor.expectNoMessage() } From df24044f97ebefa59c194b3601e92333433b0c94 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 11 Dec 2024 12:00:33 +0100 Subject: [PATCH 12/13] Update storage even for peers without active channel but only persist it to DB when there is an active channel. --- .../src/main/scala/fr/acinq/eclair/io/Peer.scala | 14 +++++++++++--- .../test/scala/fr/acinq/eclair/io/PeerSpec.scala | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index e3c4a427bd..a08709ba2f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -147,6 +147,9 @@ class Peer(val nodeParams: NodeParams, stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) case Event(e: ChannelReadyForPayments, d: DisconnectedData) => + if (d.peerStorage.written == false && !isTimerActive("peer-storage-write")) { + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + } stay() using d.copy(activeChannels = d.activeChannels + e.channelId) case Event(e: LocalChannelDown, d: DisconnectedData) => @@ -441,6 +444,9 @@ class Peer(val nodeParams: NodeParams, } } } + if (d.peerStorage.written == false && !isTimerActive("peer-storage-write")) { + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + } stay() using d.copy(activeChannels = d.activeChannels + e.channelId) case Event(e: LocalChannelDown, d: ConnectedData) => @@ -539,17 +545,19 @@ class Peer(val nodeParams: NodeParams, stay() case Event(store: PeerStorageStore, d: ConnectedData) => - if (nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels.nonEmpty) { + if (nodeParams.features.hasFeature(Features.ProvideStorage)) { // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay // writing to the DB and may never store our peer's backup. - if (d.peerStorage.written) { + if (d.activeChannels.isEmpty) { + log.debug("received peer storage from peer with no active channel") + } else if (!isTimerActive("peer-storage-write")) { startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) } stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) } else { - log.debug("ignoring peer storage (feature={}, channels={})", nodeParams.features.hasFeature(Features.ProvideStorage), d.activeChannels.mkString(",")) + log.debug("ignoring peer storage, feature disabled") stay() } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index c839341d36..ac64eb146a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -784,7 +784,7 @@ class PeerSpec extends FixtureSpec { // Our channel closes, so we stop storing backups for that peer. peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId) peerConnection3.send(peer, PeerStorageStore(hex"2222")) - peer ! WritePeerStorage + assert(!peer.isTimerActive("peer-storage-write")) assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) } From 81cb2a179a6991b96e459063246d310979b42674 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 11 Dec 2024 14:59:57 +0100 Subject: [PATCH 13/13] lint --- .../src/main/scala/fr/acinq/eclair/io/Peer.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index a08709ba2f..947da39cce 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -147,8 +147,8 @@ class Peer(val nodeParams: NodeParams, stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) case Event(e: ChannelReadyForPayments, d: DisconnectedData) => - if (d.peerStorage.written == false && !isTimerActive("peer-storage-write")) { - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) { + startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) } stay() using d.copy(activeChannels = d.activeChannels + e.channelId) @@ -444,8 +444,8 @@ class Peer(val nodeParams: NodeParams, } } } - if (d.peerStorage.written == false && !isTimerActive("peer-storage-write")) { - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) { + startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) } stay() using d.copy(activeChannels = d.activeChannels + e.channelId) @@ -552,8 +552,8 @@ class Peer(val nodeParams: NodeParams, // writing to the DB and may never store our peer's backup. if (d.activeChannels.isEmpty) { log.debug("received peer storage from peer with no active channel") - } else if (!isTimerActive("peer-storage-write")) { - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + } else if (!isTimerActive(WritePeerStorageTimerKey)) { + startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) } stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) } else { @@ -937,6 +937,7 @@ class Peer(val nodeParams: NodeParams, Logs.mdc(LogCategory(currentMessage), Some(remoteNodeId), Logs.channelId(currentMessage), nodeAlias_opt = Some(nodeParams.alias)) } + private val WritePeerStorageTimerKey = "peer-storage-write" } object Peer {