diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 2e456cfb5d..92f90d3948 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -656,6 +656,53 @@ eclair { cleanup-frequency = 1 day } + // We keep track of peer statistics to find the most profitable peers and fund channels with them to optimize our + // payment volume and fees earned. + peer-scoring { + // Set this to true if you want to start collecting data to score peers. + enabled = false + // Frequency at which we run our peer scoring algorithm. + frequency = 1 hour + // Maximum number of peers to select as candidates for liquidity and relay fee updates. + top-peers-count = 10 + // A list of node_ids with whom we will try to maintain liquidity. + top-peers-whitelist = [] + // We can automatically allocate liquidity to our top peers when necessary. + liquidity { + // If true, we will automatically fund channels. + auto-fund = false + // If true, we will automatically close unused channels to reclaim liquidity. + auto-close = false + // We only fund channels if at least this amount is necessary. + min-funding-amount-satoshis = 1000000 // 0.01 btc + // We never fund channels with more than this amount. + max-funding-amount-satoshis = 50000000 // 0.5 btc + // We stop funding channels if our on-chain balance is below this amount. + min-on-chain-balance-satoshis = 50000000 // 0.5 btc + // We stop funding channels if the on-chain feerate is above this value. + max-feerate-sat-per-byte = 5 + // Rate-limit the number of funding transactions we make per day (on average). + max-funding-tx-per-day = 6 + // Minimum time between funding the same peer, to evaluate whether the previous funding was effective. + funding-cooldown = 72 hours + } + // We can automatically update our relay fees to our top peers when necessary. + relay-fees { + // If true, we will automatically update our relay fees based on variations in outgoing payment volume. + auto-update = false + // We will not lower our fees below these values. + min-fee-base-msat = 1 + min-fee-proportional-millionths = 500 + // We will not increase our fees above these values. + max-fee-base-msat = 10000 + max-fee-proportional-millionths = 5000 + // We only increase fees if the daily outgoing payment volume exceeds this threshold or daily-payment-volume-threshold-percent. + daily-payment-volume-threshold-satoshis = 10000000 // 0.1 btc + // We only increase fees if the daily outgoing payment volume exceeds this percentage of our peer capacity or daily-payment-volume-threshold. + daily-payment-volume-threshold-percent = 0.05 + } + } + offers { // Minimum length of an offer blinded path when hiding our real node id message-path-min-length = 2 diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala b/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala index 100829ce83..b82198759c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala @@ -35,6 +35,7 @@ case class MilliSatoshi(private val underlying: Long) extends Ordered[MilliSatos def *(m: Long) = MilliSatoshi(underlying * m) def *(m: Double) = MilliSatoshi((underlying * m).toLong) def /(d: Long) = MilliSatoshi(underlying / d) + def /(d: Double) = MilliSatoshi((underlying / d).toLong) def unary_- = MilliSatoshi(-underlying) override def compare(other: MilliSatoshi): Int = underlying.compareTo(other.underlying) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index be25030aec..6f49a6b77f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -33,6 +33,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.offer.OffersConfig import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.profit.PeerScorer import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.HeuristicsConstants @@ -95,6 +96,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig, onTheFlyFundingConfig: OnTheFlyFunding.Config, peerStorageConfig: PeerStorageConfig, + peerScoringConfig: PeerScorer.Config, offersConfig: OffersConfig) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey @@ -714,6 +716,35 @@ object NodeParams extends Logging { removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS), ), + peerScoringConfig = PeerScorer.Config( + enabled = config.getBoolean("peer-scoring.enabled"), + scoringFrequency = FiniteDuration(config.getDuration("peer-scoring.frequency").getSeconds, TimeUnit.SECONDS), + topPeersCount = config.getInt("peer-scoring.top-peers-count"), + topPeersWhitelist = config.getStringList("peer-scoring.top-peers-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet, + liquidity = PeerScorer.LiquidityConfig( + autoFund = config.getBoolean("peer-scoring.liquidity.auto-fund"), + autoClose = config.getBoolean("peer-scoring.liquidity.auto-close"), + minFundingAmount = config.getLong("peer-scoring.liquidity.min-funding-amount-satoshis").sat, + maxFundingAmount = config.getLong("peer-scoring.liquidity.max-funding-amount-satoshis").sat, + maxFundingTxPerDay = config.getInt("peer-scoring.liquidity.max-funding-tx-per-day"), + minOnChainBalance = config.getLong("peer-scoring.liquidity.min-on-chain-balance-satoshis").sat, + maxFeerate = FeeratePerByte(config.getLong("peer-scoring.liquidity.max-feerate-sat-per-byte").sat).perKw, + fundingCooldown = FiniteDuration(config.getDuration("peer-scoring.liquidity.funding-cooldown").getSeconds, TimeUnit.SECONDS), + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = config.getBoolean("peer-scoring.relay-fees.auto-update"), + minRelayFees = RelayFees( + feeBase = config.getLong("peer-scoring.relay-fees.min-fee-base-msat").msat, + feeProportionalMillionths = config.getLong("peer-scoring.relay-fees.min-fee-proportional-millionths"), + ), + maxRelayFees = RelayFees( + feeBase = config.getLong("peer-scoring.relay-fees.max-fee-base-msat").msat, + feeProportionalMillionths = config.getLong("peer-scoring.relay-fees.max-fee-proportional-millionths"), + ), + dailyPaymentVolumeThreshold = config.getLong("peer-scoring.relay-fees.daily-payment-volume-threshold-satoshis").sat, + dailyPaymentVolumeThresholdPercent = config.getDouble("peer-scoring.relay-fees.daily-payment-volume-threshold-percent"), + ) + ), offersConfig = OffersConfig( messagePathMinLength = config.getInt("offers.message-path-min-length"), paymentPathCount = config.getInt("offers.payment-path-count"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 6d9bc2de5e..e3729e7768 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager} import fr.acinq.eclair.payment.receive.PaymentHandler import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator} +import fr.acinq.eclair.profit.{PeerScorer, PeerStatsTracker} import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router._ import fr.acinq.eclair.tor.{Controller, TorProtocolHandler} @@ -400,8 +401,11 @@ class Setup(val datadir: File, _ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart)) balanceActor = system.spawn(BalanceActor(bitcoinClient, nodeParams.channelConf.minDepth, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor") - postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard, router.toTyped, register, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman") + _ = if (nodeParams.peerScoringConfig.enabled) { + val statsTracker = system.spawn(Behaviors.supervise(PeerStatsTracker(nodeParams.db.audit, channels)).onFailure(typed.SupervisorStrategy.restart), name = "peer-stats-tracker") + system.spawn(Behaviors.supervise(PeerScorer(nodeParams, bitcoinClient, statsTracker, register)).onFailure(typed.SupervisorStrategy.restart), name = "peer-scorer") + } kit = Kit( nodeParams = nodeParams, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala index 1d6a1b4478..9629fa09bd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala @@ -46,6 +46,7 @@ case class TimestampMilli(private val underlying: Long) extends Ordered[Timestam require(underlying >= 0 && underlying <= 253402300799L * 1000, "invalid timestamp value") // @formatter:off def toLong: Long = underlying + def toTimestampSecond: TimestampSecond = TimestampSecond(underlying / 1000) def toSqlTimestamp: sql.Timestamp = sql.Timestamp.from(Instant.ofEpochMilli(underlying)) override def toString: String = s"$underlying unixms" override def compare(that: TimestampMilli): Int = underlying.compareTo(that.underlying) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala new file mode 100644 index 0000000000..350b33c895 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala @@ -0,0 +1,471 @@ +/* + * Copyright 2026 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.profit + +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.{ActorRef => UntypedActorRef} +import akka.util.Timeout +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong} +import fr.acinq.eclair.blockchain.OnChainBalanceChecker +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel.{CMD_CLOSE, CMD_UPDATE_RELAY_FEE, ChannelFlags, Register} +import fr.acinq.eclair.io.Peer.OpenChannel +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.profit.PeerStatsTracker._ +import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, NodeParams, TimestampMilli} + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.{Failure, Random, Success} + +/** + * Created by t-bast on 30/01/2026. + */ + +object PeerScorer { + + // @formatter:off + sealed trait Command + case class ScorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]]) extends Command + private case class WrappedLatestStats(peers: Seq[PeerInfo]) extends Command + private case class OnChainBalance(confirmed: Satoshi, unconfirmed: Satoshi) extends Command + private case class WalletError(e: Throwable) extends Command + // @formatter:on + + /** + * @param enabled the [[PeerStatsTracker]] actor should only be created when [[enabled]] is true. + * @param scoringFrequency frequency at which we run our peer scoring algorithm. + * @param topPeersCount maximum number of peers to select as candidates for liquidity and relay fee updates. + * @param topPeersWhitelist a whitelist of peers that the node operator wants to prioritize. + */ + case class Config(enabled: Boolean, scoringFrequency: FiniteDuration, topPeersCount: Int, topPeersWhitelist: Set[PublicKey], liquidity: LiquidityConfig, relayFees: RelayFeesConfig) + + /** + * @param autoFund if true, we will automatically fund channels. + * @param autoClose if true, we will automatically close unused channels to reclaim liquidity. + * @param minFundingAmount we always fund channels with at least this amount. + * @param maxFundingAmount we never fund channels with more than this amount. + * @param maxFundingTxPerDay we rate-limit the number of transactions we make per day (on average). + * @param minOnChainBalance we stop funding channels if our on-chain balance is below this amount. + * @param maxFeerate we stop funding channels if the on-chain feerate is above this value. + * @param fundingCooldown minimum time between funding the same peer, to evaluate effectiveness. + */ + case class LiquidityConfig(autoFund: Boolean, autoClose: Boolean, minFundingAmount: Satoshi, maxFundingAmount: Satoshi, maxFundingTxPerDay: Int, minOnChainBalance: Satoshi, maxFeerate: FeeratePerKw, fundingCooldown: FiniteDuration) + + /** + * @param autoUpdate if true, we will automatically update our relay fees. + * @param minRelayFees we will not lower our relay fees below this value. + * @param maxRelayFees we will not raise our relay fees above this value. + * @param dailyPaymentVolumeThreshold we only increase fees if the daily outgoing payment volume exceeds this threshold or [[dailyPaymentVolumeThresholdPercent]]. + * @param dailyPaymentVolumeThresholdPercent we only increase fees if the daily outgoing payment volume exceeds this percentage of our peer capacity or [[dailyPaymentVolumeThreshold]]. + */ + case class RelayFeesConfig(autoUpdate: Boolean, minRelayFees: RelayFees, maxRelayFees: RelayFees, dailyPaymentVolumeThreshold: Satoshi, dailyPaymentVolumeThresholdPercent: Double) + + private case class FundingProposal(peer: PeerInfo, fundingAmount: Satoshi) { + val remoteNodeId: PublicKey = peer.remoteNodeId + } + + private case class FundingDecision(fundingAmount: Satoshi, dailyVolumeOutAtFunding: MilliSatoshi, timestamp: TimestampMilli) + + // @formatter:off + private sealed trait FeeDirection + private case object FeeIncrease extends FeeDirection { override def toString: String = "increase" } + private case object FeeDecrease extends FeeDirection { override def toString: String = "decrease" } + // @formatter:on + + private case class FeeChangeDecision(direction: FeeDirection, previousFee: RelayFees, newFee: RelayFees, dailyVolumeOutAtChange: MilliSatoshi) + + private case class DecisionHistory(funding: Map[PublicKey, FundingDecision], feeChanges: Map[PublicKey, FeeChangeDecision]) { + // @formatter:off + def addFunding(nodeId: PublicKey, record: FundingDecision): DecisionHistory = copy(funding = funding.updated(nodeId, record)) + def addFeeChanges(records: Map[PublicKey, FeeChangeDecision]): DecisionHistory = copy(feeChanges = feeChanges.concat(records)) + def revertFeeChanges(remoteNodeIds: Set[PublicKey]): DecisionHistory = copy(feeChanges = feeChanges -- remoteNodeIds) + def cleanup(now: TimestampMilli, maxAge: FiniteDuration): DecisionHistory = copy(funding = funding.filter { case (_, r) => now - r.timestamp < maxAge }) + // @formatter:on + } + + private def rollingDailyStats(p: PeerInfo): Seq[Seq[PeerStats]] = { + (0 until (p.stats.size - Bucket.bucketsPerDay)).map(i => p.stats.slice(i, i + Bucket.bucketsPerDay)) + } + + private def bestDailyVolumeOut(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.totalAmountOut).sum).max + + private def bestDailyProfit(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.profit).sum).max + + private def bestDailyOutgoingFlow(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.outgoingFlow).sum).max + + def apply(nodeParams: NodeParams, wallet: OnChainBalanceChecker, statsTracker: ActorRef[PeerStatsTracker.GetLatestStats], register: UntypedActorRef): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(ScorePeers(None), nodeParams.peerScoringConfig.scoringFrequency) + new PeerScorer(nodeParams, wallet, statsTracker, register, context).run(DecisionHistory(Map.empty, Map.empty)) + } + } + } + +} + +private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, statsTracker: ActorRef[PeerStatsTracker.GetLatestStats], register: UntypedActorRef, context: ActorContext[PeerScorer.Command]) { + + import PeerScorer._ + + implicit val ec: ExecutionContext = context.system.executionContext + private val log = context.log + private val config = nodeParams.peerScoringConfig + + private def run(history: DecisionHistory): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case ScorePeers(replyTo_opt) => + statsTracker ! PeerStatsTracker.GetLatestStats(context.messageAdapter[PeerStatsTracker.LatestStats](e => WrappedLatestStats(e.peers))) + waitForStats(replyTo_opt, history) + } + } + + private def waitForStats(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], history: DecisionHistory): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case WrappedLatestStats(peers) => scorePeers(replyTo_opt, peers, history) + } + } + + private def scorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], peers: Seq[PeerInfo], history: DecisionHistory): Behavior[Command] = { + log.info("scoring {} peers", peers.size) + val dailyProfit = peers.map(_.stats.take(Bucket.bucketsPerDay).map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc + val weeklyProfit = peers.map(_.stats.map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc + log.info("rolling daily profit = {} and weekly profit = {}", dailyProfit, weeklyProfit) + + // We select peers that have the largest outgoing payment volume of the past day. + // This biases towards nodes that already have a large capacity and have liquidity available. + val bestPeersByVolume = peers + .map(p => (p, p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum)) + .filter(_._2 > 0.msat) + .sortBy(_._2)(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + .map(_._1) + log.debug("top {} peers:", bestPeersByVolume.size) + printDailyStats(bestPeersByVolume) + + // We select peers that have a low outgoing balance but were previously good peers, by looking at the largest daily + // outgoing payment volume of the past week, using a rolling daily window. This also biases towards nodes that + // already have a large capacity, but lets us identify good peers that ran out of liquidity. + val bestPeersWithoutLiquidityByVolume = peers + .filter(p => p.canSend <= p.capacity * 0.1) + .map(p => (p, bestDailyVolumeOut(p))) + .sortBy(_._2)(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + .map(_._1) + + // We identify peers that need additional liquidity. + val bestPeersThatNeedLiquidity = bestPeersByVolume + // We select peers that have a large outgoing flow, which means they will run out of liquidity in the next days. + .filter(p => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2) + // And peers that have already ran out of liquidity. + .appendedAll(bestPeersWithoutLiquidityByVolume) + .distinctBy(_.remoteNodeId) + // We order them by their best daily profit of the past week. + .map(p => (p, bestDailyProfit(p))) + .sortBy(_._2)(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + // We fund a multiple of our daily outgoing flow, to ensure we have enough liquidity for a few days. + .map { case (p, _) => FundingProposal(p, (bestDailyOutgoingFlow(p) * 4).truncateToSatoshi) } + .filter(_.fundingAmount > 0.sat) + if (bestPeersThatNeedLiquidity.nonEmpty) { + log.debug("top {} peers that need liquidity:", bestPeersThatNeedLiquidity.size) + printDailyStats(bestPeersThatNeedLiquidity.map(_.peer)) + } + + // We select peers that are performing well relative to their capacity, which lets us identify good peers that may + // have a smaller capacity and may deserve a capacity increase. + val goodSmallPeers = peers + .map(p => (p, bestDailyVolumeOut(p).truncateToSatoshi.toLong.toDouble / p.capacity.toLong)) + .filter(_._2 > 0) + .sortBy(_._2)(Ordering[Double].reverse) + .take(config.topPeersCount) + // We only keep peers that may run out of liquidity in the next days. + .filter { case (p, _) => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2 } + .map(_._1) + // We'd like to increase their capacity by 50%. + .map(p => FundingProposal(p, p.capacity * 0.5)) + if (goodSmallPeers.nonEmpty) { + log.debug("we've identified {} smaller peers that perform well relative to their capacity", goodSmallPeers.size) + printDailyStats(goodSmallPeers.map(_.peer)) + } + + // We try to identify peers that may have ran out of liquidity a long time ago but are interesting for us. + val peersToRevive = peers + // We prioritize peers that the node operator whitelisted. + .filter(p => config.topPeersWhitelist.contains(p.remoteNodeId)) + .sortBy(_.capacity)(Ordering[Satoshi].reverse) + // And we add peers with a large capacity that have a low balance. + .appendedAll(peers + .sortBy(_.capacity)(Ordering[Satoshi].reverse) + .take(config.topPeersCount) + .filter(p => p.canSend <= p.capacity * 0.1) + ) + .distinctBy(_.remoteNodeId) + // We'd like to increase their capacity by 25%. + .map(p => FundingProposal(p, p.capacity * 0.25)) + + // Since we're not yet reading past events from the DB, we need to wait until we have collected enough data before + // taking some actions such as opening or closing channels or updating relay fees. + // TODO: remove this once we start reading past data from the AuditDb on restart. + val hasPastData = bestPeersByVolume.exists(_.stats.drop(Bucket.bucketsPerDay).exists(_ != PeerStats.empty)) + if (hasPastData && replyTo_opt.isEmpty) { + closeChannelsIfNeeded(peers) + val history1 = updateRelayFeesIfNeeded(peers, history) + fundChannelsIfNeeded(bestPeersThatNeedLiquidity, goodSmallPeers, peersToRevive, history1) + } else { + replyTo_opt.foreach(_ ! bestPeersThatNeedLiquidity.map(_.peer)) + run(history) + } + } + + private def printDailyStats(peers: Seq[PeerInfo]): Unit = { + log.debug("| rank | node_id | daily_volume | daily_profit | can_send | can_receive |") + log.debug("|------|--------------------------------------------------------------------|---------------|---------------|---------------|---------------|") + peers.zipWithIndex.foreach { case (p, i) => + val dailyStats = p.stats.take(Bucket.bucketsPerDay) + val dailyVolume = dailyStats.map(_.totalAmountOut).sum.truncateToSatoshi.toMilliBtc + val dailyProfit = dailyStats.map(_.profit).sum.truncateToSatoshi.toMilliBtc + val canSend = p.canSend.truncateToSatoshi.toMilliBtc + val canReceive = p.canReceive.truncateToSatoshi.toMilliBtc + log.debug(f"| ${i + 1}%4d | ${p.remoteNodeId} | ${dailyVolume.toDouble}%8.2f mbtc | ${dailyProfit.toDouble}%8.2f mbtc | ${canSend.toDouble}%8.2f mbtc | ${canReceive.toDouble}%8.2f mbtc |") + } + log.debug("|------|--------------------------------------------------------------------|---------------|---------------|---------------|---------------|") + } + + private def closeChannelsIfNeeded(peers: Seq[PeerInfo]): Unit = { + peers + // We only close channels when we have more than one. + .filter(_.channels.size > 1) + // We only close channels for which most of the liquidity is idle on our side. + .filter(p => p.canSend >= p.capacity * 0.8 && p.stats.map(_.totalAmountOut).sum <= p.capacity * 0.05) + .foreach(p => { + val channels = p.channels.sortWith { + // We want to keep a public channel over a private channel. + case (c1, c2) if c1.isPublic != c2.isPublic => c1.isPublic + // Otherwise, we keep the channel with the largest capacity. + case (c1, c2) if c1.capacity != c2.capacity => c1.capacity >= c2.capacity + // Otherwise, we keep the channel with the smallest balance (and thus highest inbound liquidity). + case (c1, c2) => c1.canSend <= c2.canSend + } + // We keep the best channel and close the others. + channels.tail.foreach { c => + log.debug("we should close channel_id={} with remote_node_id={} (local={}, remote={})", c.channelId, p.remoteNodeId, c.canSend.truncateToSatoshi.toMilliBtc, c.canReceive.truncateToSatoshi.toMilliBtc) + if (config.liquidity.autoClose && nodeParams.currentFeeratesForFundingClosing.medium <= config.liquidity.maxFeerate) { + log.info("closing channel_id={} with remote_node_id={} (local={}, remote={})", c.channelId, p.remoteNodeId, c.canSend.truncateToSatoshi.toMilliBtc, c.canReceive.truncateToSatoshi.toMilliBtc) + val cmd = CMD_CLOSE(UntypedActorRef.noSender, None, None) + register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd) + } + } + }) + } + + private def updateRelayFeesIfNeeded(peers: Seq[PeerInfo], history: DecisionHistory): DecisionHistory = { + // We configure *daily* absolute and proportional payment volume targets. We look at events from the current period + // and the previous period, so we need to get the right ratio to convert those daily amounts. + val now = TimestampMilli.now() + val lastTwoBucketsRatio = 1.0 + Bucket.consumed(now) + val lastTwoBucketsDailyRatio = (Bucket.duration * lastTwoBucketsRatio).toSeconds.toDouble / (24 * 3600) + // We increase fees of channels that are performing better than we expected. + val feeIncreases = peers + // We select peers that have exceeded our payment volume target in the past two periods. + .filter(p => p.stats.take(2).map(_.totalAmountOut).sum >= Seq(config.relayFees.dailyPaymentVolumeThreshold * lastTwoBucketsDailyRatio, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent * lastTwoBucketsDailyRatio).min) + // And that have an increasing payment volume compared to the period before that. + .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat) + .filter(p => (p.stats.take(2).map(_.totalAmountOut).sum / lastTwoBucketsRatio) > p.stats.slice(2, 3).map(_.totalAmountOut).sum * 1.1) + .flatMap(p => { + p.latestUpdate_opt match { + // And for which we haven't updated our relay fees recently already. + case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds => + val next = u.relayFees.copy(feeProportionalMillionths = u.feeProportionalMillionths + 500) + if (next.feeBase <= config.relayFees.maxRelayFees.feeBase && next.feeProportionalMillionths <= config.relayFees.maxRelayFees.feeProportionalMillionths) { + val dailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + Some(p.remoteNodeId -> FeeChangeDecision(FeeIncrease, u.relayFees, next, dailyVolume)) + } else { + None + } + case _ => None + } + }).toMap + // We decrease fees of channels that aren't performing well. + val feeDecreases = peers + // We select peers that have a recent 15% decrease in outgoing payment volume. + .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat) + .filter(p => p.stats.take(2).map(_.totalAmountOut).sum <= p.stats.slice(2, 3).map(_.totalAmountOut).sum * 0.85) + // For which the volume was previously already stable or decreasing. + .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum <= p.stats.slice(3, 4).map(_.totalAmountOut).sum) + // And that have enough liquidity to relay outgoing payments. + .filter(p => p.canSend >= Seq(config.relayFees.dailyPaymentVolumeThreshold, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent).min) + .flatMap(p => { + p.latestUpdate_opt match { + // And for which we haven't updated our relay fees recently already. + case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds => + val next = u.relayFees.copy(feeProportionalMillionths = u.feeProportionalMillionths - 500) + if (next.feeBase >= config.relayFees.minRelayFees.feeBase && next.feeProportionalMillionths >= config.relayFees.minRelayFees.feeProportionalMillionths) { + val dailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + Some(p.remoteNodeId -> FeeChangeDecision(FeeDecrease, u.relayFees, next, dailyVolume)) + } else { + None + } + case _ => None + } + }).toMap + // We revert fee changes that had a negative impact on volume. + val feeReverts = peers + .filterNot(p => feeIncreases.contains(p.remoteNodeId) || feeDecreases.contains(p.remoteNodeId)) + .flatMap { p => + (history.feeChanges.get(p.remoteNodeId), p.latestUpdate_opt) match { + case (Some(record), Some(u)) => + // Confirm our change is still in effect. + val updateMatchesRecord = u.feeProportionalMillionths == record.newFee.feeProportionalMillionths + // We expect around 6h-24h after the update to really see its effect on path-finding: after 24h, too many + // other parameters may interfere with our evaluation. + val withinEvaluationWindow = u.timestamp >= now.toTimestampSecond - (24 hours).toSeconds + // We don't want to update fees too often. + val notUpdatedRecently = u.timestamp <= now.toTimestampSecond - (Bucket.duration * 2).toSeconds + // We revert both fee increases and fee decreases: if volume didn't increase after a fee decrease, we'd + // rather collect our previous (bigger) relay fee. + val currentDailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + val shouldRevert = record.direction match { + case FeeIncrease => currentDailyVolume < record.dailyVolumeOutAtChange * 0.8 + case FeeDecrease => currentDailyVolume < record.dailyVolumeOutAtChange * 0.9 + } + if (updateMatchesRecord && withinEvaluationWindow && notUpdatedRecently && shouldRevert) { + val decision = record.direction match { + case PeerScorer.FeeIncrease => FeeChangeDecision(FeeDecrease, u.relayFees, u.relayFees.copy(feeProportionalMillionths = u.relayFees.feeProportionalMillionths - 500), currentDailyVolume) + case PeerScorer.FeeDecrease => FeeChangeDecision(FeeIncrease, u.relayFees, u.relayFees.copy(feeProportionalMillionths = u.relayFees.feeProportionalMillionths + 500), currentDailyVolume) + } + Some(p.remoteNodeId -> decision) + } else { + None + } + case _ => None + } + }.toMap + // We print the results to help debugging. + if (feeIncreases.nonEmpty || feeDecreases.nonEmpty || feeReverts.nonEmpty) { + log.debug("we should update our relay fees with the following peers:") + log.debug("| node_id | volume_variation | decision | current_fee | next_fee |") + log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + (feeIncreases.toSeq ++ feeDecreases.toSeq ++ feeReverts.toSeq).foreach { case (remoteNodeId, decision) => + val volumeVariation = peers.find(_.remoteNodeId == remoteNodeId) match { + case Some(p) if p.stats.slice(2, 3).map(_.totalAmountOut).sum != 0.msat => p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio) + case _ => 0.0 + } + log.debug(f"| $remoteNodeId | $volumeVariation%.2f | ${decision.direction} | ${decision.previousFee.feeProportionalMillionths}%11d | ${decision.newFee.feeProportionalMillionths}%8d |") + } + log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + } + if (config.relayFees.autoUpdate) { + (feeIncreases.toSeq ++ feeDecreases.toSeq ++ feeReverts.toSeq).foreach { case (remoteNodeId, decision) => + val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, decision.newFee.feeBase, decision.newFee.feeProportionalMillionths) + peers.find(_.remoteNodeId == remoteNodeId).map(_.channels).getOrElse(Nil).foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)) + } + } + // Note that in order to avoid oscillating between reverts (reverting a revert), we remove the previous records when + // reverting a change: this way, the normal algorithm resumes during the next run. + history.addFeeChanges(feeIncreases ++ feeDecreases).revertFeeChanges(feeReverts.keySet) + } + + private def fundChannelsIfNeeded(bestPeers: Seq[FundingProposal], smallPeers: Seq[FundingProposal], toRevive: Seq[FundingProposal], history: DecisionHistory): Behavior[Command] = { + // We don't want to fund peers every time our scoring algorithm runs, otherwise we may create too many on-chain + // transactions. We draw from a random distribution to ensure that on average: + // - we follow our rate-limit for funding best peers and fund at most 3 at a time + // - we fund a small peer once every 3 days, chosen at random between our top 3 + // - we revive an older peer once every day, chosen at random between our top 3 + val toFund = { + val scoringPerDay = (1 day).toSeconds / config.scoringFrequency.toSeconds + val bestPeersToFund = bestPeers.headOption match { + case Some(_) if Random.nextDouble() <= config.liquidity.maxFundingTxPerDay.toDouble / (scoringPerDay * bestPeers.size.min(3)) => bestPeers.take(3) + case _ => Nil + } + val smallPeerToFund_opt = smallPeers.headOption match { + case Some(_) if Random.nextDouble() <= (1.0 / (scoringPerDay * 3)) => Random.shuffle(smallPeers.take(3)).headOption + case _ => None + } + val toReviveNotAlreadySelected = toRevive.filterNot(p => bestPeers.exists(_.remoteNodeId == p.remoteNodeId) || smallPeerToFund_opt.exists(_.remoteNodeId == p.remoteNodeId)) + val toRevive_opt = toReviveNotAlreadySelected.headOption match { + case Some(_) if Random.nextDouble() <= (1.0 / scoringPerDay) => Random.shuffle(toReviveNotAlreadySelected.take(3)).headOption + case _ => None + } + (bestPeersToFund ++ toRevive_opt ++ smallPeerToFund_opt).distinctBy(_.remoteNodeId) + } + if (bestPeers.isEmpty && smallPeers.isEmpty && toRevive.isEmpty) { + log.info("we haven't identified peers that require liquidity yet") + run(history) + } else if (toFund.isEmpty) { + log.info("we skip funding peers because of per-day rate-limits: increase eclair.peer-scoring.liquidity.max-funding-tx-per-day to fund more often") + run(history) + } else if (config.liquidity.maxFeerate < nodeParams.currentFeeratesForFundingClosing.medium) { + log.info("we skip funding peers because current feerate is too high ({} < {}): increase eclair.peer-scoring.liquidity.max-feerate-sat-per-byte to start funding again", config.liquidity.maxFeerate, nodeParams.currentFeeratesForFundingClosing.medium) + run(history) + } else { + context.pipeToSelf(wallet.onChainBalance()) { + case Success(b) => OnChainBalance(b.confirmed, b.unconfirmed) + case Failure(e) => WalletError(e) + } + Behaviors.receiveMessagePartial { + case WalletError(e) => + log.warn("cannot get on-chain balance: {}", e.getMessage) + run(history) + case OnChainBalance(confirmed, unconfirmed) => + val now = TimestampMilli.now() + val history1 = if (confirmed <= config.liquidity.minOnChainBalance) { + log.info("we don't have enough on-chain balance to fund new channels (confirmed={}, unconfirmed={})", confirmed.toMilliBtc, unconfirmed.toMilliBtc) + history + } else { + toFund + // We don't fund peers that are already being funded. + .filterNot(p => p.peer.hasPendingChannel) + // We don't fund peers that were recently funded, unless volume improved since the last funding. + .filter { f => + history.funding.get(f.remoteNodeId) match { + case Some(record) if now - record.timestamp < config.liquidity.fundingCooldown => + val currentDailyVolume = f.peer.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + if (currentDailyVolume <= record.dailyVolumeOutAtFunding * 1.1) { + log.info("skipping funding for remote_node_id={}: last funded {} ago, volume has not improved (was={}, now={})", f.remoteNodeId, now - record.timestamp, record.dailyVolumeOutAtFunding.truncateToSatoshi.toMilliBtc, currentDailyVolume.truncateToSatoshi.toMilliBtc) + false + } else { + log.info("re-funding remote_node_id={}: volume improved since last funding (was={}, now={})", f.remoteNodeId, record.dailyVolumeOutAtFunding.truncateToSatoshi.toMilliBtc, currentDailyVolume.truncateToSatoshi.toMilliBtc) + true + } + case _ => true + } + } + // And we apply our configured funding limits to the liquidity suggestions. + .map(f => f.copy(fundingAmount = f.fundingAmount.max(config.liquidity.minFundingAmount).min(config.liquidity.maxFundingAmount))) + .foldLeft((confirmed - config.liquidity.minOnChainBalance, history)) { + case ((available, history), f) if available < f.fundingAmount * 0.5 => (available, history) + case ((available, history), f) => + val fundingAmount = f.fundingAmount.min(available) + log.info("funding channel with remote_node_id={} (funding_amount={})", f.remoteNodeId, fundingAmount.toMilliBtc) + // TODO: when do we want to create a private channel? Maybe if we already have a bigger public channel? + val channelFlags = ChannelFlags(announceChannel = true) + val cmd = OpenChannel(f.remoteNodeId, fundingAmount, None, None, None, None, None, Some(channelFlags), Some(Timeout(60 seconds))) + register ! Register.ForwardNodeId(context.system.ignoreRef, f.remoteNodeId, cmd) + val dailyVolume = f.peer.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + (available - fundingAmount, history.addFunding(f.remoteNodeId, FundingDecision(fundingAmount, dailyVolume, now))) + }._2 + } + run(history1.cleanup(now, 7 days)) + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala new file mode 100644 index 0000000000..e44a3c0959 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala @@ -0,0 +1,380 @@ +/* + * Copyright 2026 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.profit + +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior} +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong} +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.db.AuditDb +import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent} +import fr.acinq.eclair.wire.protocol.ChannelUpdate +import fr.acinq.eclair.{Features, MilliSatoshi, MilliSatoshiLong, TimestampMilli, ToMilliSatoshiConversion} + +import java.time.{Instant, ZoneId, ZonedDateTime} +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +/** + * Created by t-bast on 30/01/2026. + */ + +object PeerStatsTracker { + // @formatter:off + sealed trait Command + case class GetLatestStats(replyTo: ActorRef[LatestStats]) extends Command + private[profit] case object RemoveOldBuckets extends Command + private[profit] case class WrappedPaymentSent(e: PaymentSent) extends Command + private[profit] case class WrappedPaymentRelayed(e: PaymentRelayed) extends Command + private[profit] case class WrappedPaymentReceived(e: PaymentReceived) extends Command + private[profit] case class ChannelCreationInProgress(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command + private[profit] case class ChannelCreationAborted(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command + private[profit] case class WrappedLocalChannelUpdate(e: LocalChannelUpdate) extends Command + private[profit] case class WrappedLocalChannelDown(e: LocalChannelDown) extends Command + private[profit] case class WrappedAvailableBalanceChanged(e: AvailableBalanceChanged) extends Command + // @formatter:on + + def apply(db: AuditDb, channels: Seq[PersistentChannelData]): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.withTimers { timers => + new PeerStatsTracker(db, timers, context).start(channels) + } + } + } + + /** Returns the latest statistics for all of our public peers. */ + case class LatestStats(peers: Seq[PeerInfo]) + + /** NB: stats are ordered, with the most recent events first. */ + case class PeerInfo(remoteNodeId: PublicKey, stats: Seq[PeerStats], channels: Seq[ChannelInfo], latestUpdate_opt: Option[ChannelUpdate], hasPendingChannel: Boolean) { + val capacity: Satoshi = channels.map(_.capacity).sum + val canSend: MilliSatoshi = channels.map(_.canSend).sum + val canReceive: MilliSatoshi = channels.map(_.canReceive).sum + } + + /** We compute peer statistics per buckets spanning a few hours. */ + case class Bucket(private val day: Int, private val month: Int, private val year: Int, private val slot: Int) extends Ordered[Bucket] { + override def compare(that: Bucket): Int = that match { + case Bucket(_, _, y, _) if y != year => year - y + case Bucket(_, m, _, _) if m != month => month - m + case Bucket(d, _, _, _) if d != day => day - d + case Bucket(_, _, _, s) => slot - s + } + + override def toString: String = f"$year-$month%02d-$day%02d-$slot" + } + + object Bucket { + val duration: FiniteDuration = 3 hours + val bucketsPerDay: Int = 8 + + def from(ts: TimestampMilli): Bucket = { + val date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts.toLong), ZoneId.of("UTC")) + Bucket(date.getDayOfMonth, date.getMonthValue, date.getYear, date.getHour * bucketsPerDay / 24) + } + + /** Returns the percentage (between 0.00 and 1.00) of the bucket consumed at the given timestamp. */ + def consumed(ts: TimestampMilli): Double = { + val bucket = from(ts) + val start = ZonedDateTime.of(bucket.year, bucket.month, bucket.day, bucket.slot * 24 / bucketsPerDay, 0, 0, 0, ZoneId.of("UTC")).toEpochSecond + (ts.toTimestampSecond.toLong - start).toDouble / duration.toSeconds + } + } + + case class PeerStats(totalAmountIn: MilliSatoshi, totalAmountOut: MilliSatoshi, relayFeeEarned: MilliSatoshi, onChainFeePaid: Satoshi, liquidityFeeEarned: Satoshi, liquidityFeePaid: Satoshi) { + val outgoingFlow: MilliSatoshi = totalAmountOut - totalAmountIn + val profit: MilliSatoshi = relayFeeEarned + liquidityFeeEarned.toMilliSatoshi - onChainFeePaid.toMilliSatoshi - liquidityFeePaid.toMilliSatoshi + } + + object PeerStats { + def empty: PeerStats = PeerStats(0 msat, 0 msat, 0 msat, 0 sat, 0 sat, 0 sat) + } + + /** + * We aggregate events into buckets to avoid storing too much data per peer, while providing enough granularity to + * detect variations in volume and flows. Note that we store an entry for every peer with whom we have a channel, + * even for peers that don't have any activity (which lets us detect those peers and potentially reclaim liquidity). + */ + case class BucketedPeerStats(private val stats: Map[PublicKey, Map[Bucket, PeerStats]]) { + def peers: Iterable[PublicKey] = stats.keys + + /** Returns stats for the given peer in all buckets (most recent bucket first). */ + def getPeerStats(remoteNodeId: PublicKey, now: TimestampMilli): Seq[PeerStats] = { + stats.get(remoteNodeId) match { + case Some(_) => (0 until BucketedPeerStats.bucketsCount).map(b => getPeerStatsForBucket(remoteNodeId, Bucket.from(now - b * Bucket.duration))) + case None => Seq.fill(BucketedPeerStats.bucketsCount)(PeerStats.empty) + } + } + + /** Returns stats for the given bucket. */ + private def getPeerStatsForBucket(remoteNodeId: PublicKey, bucket: Bucket): PeerStats = { + stats.get(remoteNodeId).flatMap(_.get(bucket)).getOrElse(PeerStats.empty) + } + + /** + * When our first channel with a peer is created, we want to add an entry for it, even though there are no payments yet. + * This lets us detect new peers that are idle, which can be useful in many scenarios. + */ + def initializePeerIfNeeded(remoteNodeId: PublicKey): BucketedPeerStats = { + stats.get(remoteNodeId) match { + case Some(_) => this // already initialized + case None => this.copy(stats = stats + (remoteNodeId -> Map.empty)) + } + } + + private def addOrUpdate(remoteNodeId: PublicKey, bucket: Bucket, peerStats: PeerStats): BucketedPeerStats = { + val buckets = stats.getOrElse(remoteNodeId, Map.empty[Bucket, PeerStats]) + copy(stats = stats + (remoteNodeId -> (buckets + (bucket -> peerStats)))) + } + + def addPaymentSent(e: PaymentSent): BucketedPeerStats = { + e.parts.foldLeft(this) { + case (current, p) => + val bucket = Bucket.from(p.settledAt) + val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + p.amountWithFees) + current.addOrUpdate(p.remoteNodeId, bucket, peerStats1) + } + } + + def addPaymentReceived(e: PaymentReceived): BucketedPeerStats = { + e.parts.foldLeft(this) { + case (current, p) => + val bucket = Bucket.from(p.receivedAt) + val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + p.amount) + current.addOrUpdate(p.remoteNodeId, bucket, peerStats1) + } + } + + def addPaymentRelayed(e: PaymentRelayed): BucketedPeerStats = { + val withIncoming = e.incoming.foldLeft(this) { + case (current, i) => + val bucket = Bucket.from(i.receivedAt) + val peerStats = current.getPeerStatsForBucket(i.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + i.amount) + current.addOrUpdate(i.remoteNodeId, bucket, peerStats1) + } + e.outgoing.foldLeft(withIncoming) { + case (current, o) => + val bucket = Bucket.from(o.settledAt) + val peerStats = current.getPeerStatsForBucket(o.remoteNodeId, bucket) + // When using MPP and trampoline, payments can be relayed through multiple nodes at once. + // We split the fee according to the proportional amount relayed through the requested node. + val relayFee = e.relayFee * (o.amount.toLong.toDouble / e.amountOut.toLong) + val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + o.amount, relayFeeEarned = peerStats.relayFeeEarned + relayFee) + current.addOrUpdate(o.remoteNodeId, bucket, peerStats1) + } + } + + /** Remove old buckets that exceed our retention window. This should be called frequently to avoid memory leaks. */ + def removeOldBuckets(now: TimestampMilli): BucketedPeerStats = { + val oldestBucket = Bucket.from(now - Bucket.duration * BucketedPeerStats.bucketsCount) + copy(stats = stats.map { + case (remoteNodeId, peerStats) => remoteNodeId -> peerStats.filter { case (bucket, _) => bucket >= oldestBucket } + }) + } + + /** Remove a peer from our list: this should only happen when we don't have channels with that peer anymore. */ + def removePeer(remoteNodeId: PublicKey): BucketedPeerStats = copy(stats = stats - remoteNodeId) + } + + object BucketedPeerStats { + // We keep 7 days of past history. + val bucketsCount: Int = 7 * Bucket.bucketsPerDay + + def empty(peers: Set[PublicKey]): BucketedPeerStats = BucketedPeerStats(peers.map(remoteNodeId => remoteNodeId -> Map.empty[Bucket, PeerStats]).toMap) + } + + /** We keep minimal information about open channels to allow running our heuristics. */ + case class ChannelInfo(channelId: ByteVector32, capacity: Satoshi, canSend: MilliSatoshi, canReceive: MilliSatoshi, isPublic: Boolean) + + object ChannelInfo { + def apply(commitments: Commitments): ChannelInfo = ChannelInfo(commitments.channelId, commitments.latest.capacity, commitments.availableBalanceForSend, commitments.availableBalanceForReceive, commitments.announceChannel) + } + + /** + * Note that we keep channel updates separately: we're only interested in the relay fees, which are the same for every channel. + * We also keep track of pending channels (channels being created), to avoid creating many channels at the same time. + */ + case class PeerChannels(private val channels: Map[PublicKey, Seq[ChannelInfo]], private val updates: Map[PublicKey, ChannelUpdate], private val pending: Map[PublicKey, Set[ByteVector32]]) { + def peers: Set[PublicKey] = channels.keySet + + def getChannels(remoteNodeId: PublicKey): Seq[ChannelInfo] = channels.getOrElse(remoteNodeId, Nil) + + def getUpdate(remoteNodeId: PublicKey): Option[ChannelUpdate] = updates.get(remoteNodeId) + + def hasChannels(remoteNodeId: PublicKey): Boolean = channels.contains(remoteNodeId) + + def hasPendingChannel(remoteNodeId: PublicKey): Boolean = pending.contains(remoteNodeId) + + def updateChannel(e: LocalChannelUpdate): PeerChannels = { + // Note that creating our channel update implicitly means that the channel isn't pending anymore. + updates.get(e.remoteNodeId) match { + case Some(u) if u.timestamp > e.channelUpdate.timestamp => updateChannel(e.commitments).removePendingChannel(e.remoteNodeId, e.channelId) + case _ => updateChannel(e.commitments).copy(updates = updates + (e.remoteNodeId -> e.channelUpdate)).removePendingChannel(e.remoteNodeId, e.channelId) + } + } + + def updateChannel(e: AvailableBalanceChanged): PeerChannels = { + updateChannel(e.commitments) + } + + private def updateChannel(commitments: Commitments): PeerChannels = { + if (!commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve)) { + val peerChannels1 = channels.getOrElse(commitments.remoteNodeId, Nil).filter(_.channelId != commitments.channelId) :+ ChannelInfo(commitments) + copy(channels = channels + (commitments.remoteNodeId -> peerChannels1)) + } else { + // We filter out channels with mobile wallets. + copy(channels = channels - commitments.remoteNodeId) + } + } + + def addPendingChannel(e: ChannelCreationInProgress): PeerChannels = { + val pending1 = pending + (e.remoteNodeId -> (pending.getOrElse(e.remoteNodeId, Set.empty) + e.channelId)) + copy(pending = pending1) + } + + def removeChannel(e: LocalChannelDown): PeerChannels = { + val updated = channels.get(e.remoteNodeId) match { + case Some(peerChannels) => + val peerChannels1 = peerChannels.filter(_.channelId != e.channelId) + if (peerChannels1.isEmpty) { + copy(channels = channels - e.remoteNodeId, updates = updates - e.remoteNodeId) + } else { + copy(channels = channels + (e.remoteNodeId -> peerChannels1)) + } + case None => this + } + updated.removePendingChannel(e.remoteNodeId, e.channelId) + } + + private def removePendingChannel(remoteNodeId: PublicKey, channelId: ByteVector32): PeerChannels = { + val pending1 = pending.get(remoteNodeId) match { + case Some(channels) => + val channels1 = channels - channelId + if (channels1.isEmpty) { + pending - remoteNodeId + } else { + pending + (remoteNodeId -> channels1) + } + case None => pending + } + copy(pending = pending1) + } + + def removeChannel(e: ChannelCreationAborted): PeerChannels = { + removePendingChannel(e.remoteNodeId, e.channelId) + } + } + + private object PeerChannels { + def apply(channels: Seq[PersistentChannelData]): PeerChannels = { + channels.foldLeft(PeerChannels(Map.empty[PublicKey, Seq[ChannelInfo]], Map.empty[PublicKey, ChannelUpdate], Map.empty[PublicKey, Set[ByteVector32]])) { + case (current, channel) => channel match { + // We include pending channels. + case _: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED | _: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_DUAL_FUNDING_READY | _: DATA_WAIT_FOR_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_CHANNEL_READY if !channel.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) => + val pending1 = current.pending.getOrElse(channel.remoteNodeId, Set.empty) + channel.channelId + current.copy(pending = current.pending + (channel.remoteNodeId -> pending1)) + // We filter out channels with mobile wallets. + case d: DATA_NORMAL if !d.commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) => + val peerChannels1 = current.channels.getOrElse(d.remoteNodeId, Nil) :+ ChannelInfo(d.commitments) + val update1 = current.updates.get(d.remoteNodeId) match { + case Some(update) if update.timestamp > d.channelUpdate.timestamp => update + case _ => d.channelUpdate + } + current.copy( + channels = current.channels + (d.remoteNodeId -> peerChannels1), + updates = current.updates + (d.remoteNodeId -> update1) + ) + case _ => current + } + } + } + } + +} + +private class PeerStatsTracker(db: AuditDb, timers: TimerScheduler[PeerStatsTracker.Command], context: ActorContext[PeerStatsTracker.Command]) { + + import PeerStatsTracker._ + + private val log = context.log + + private def start(channels: Seq[PersistentChannelData]): Behavior[Command] = { + // We subscribe to channel events to update channel balances. + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelIdAssigned](e => ChannelCreationInProgress(e.remoteNodeId, e.channelId))) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelAborted](e => ChannelCreationAborted(e.remoteNodeId, e.channelId))) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelDown)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelUpdate)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedAvailableBalanceChanged)) + val peerChannels = PeerChannels(channels) + log.info("gathering statistics for {} peers", peerChannels.peers.size) + // We subscribe to payment events to update statistics. + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentSent)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentRelayed)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentReceived)) + // TODO: read events that happened before startedAt from the DB to initialize statistics from past data. + val stats = BucketedPeerStats.empty(peerChannels.peers) + timers.startTimerWithFixedDelay(RemoveOldBuckets, Bucket.duration) + listening(stats, peerChannels) + } + + private def listening(stats: BucketedPeerStats, channels: PeerChannels): Behavior[Command] = { + Behaviors.receiveMessage { + case WrappedPaymentSent(e) => + listening(stats.addPaymentSent(e), channels) + case WrappedPaymentReceived(e) => + listening(stats.addPaymentReceived(e), channels) + case WrappedPaymentRelayed(e) => + listening(stats.addPaymentRelayed(e), channels) + case e: ChannelCreationInProgress => + listening(stats, channels.addPendingChannel(e)) + case e: ChannelCreationAborted => + listening(stats, channels.removeChannel(e)) + case WrappedLocalChannelUpdate(e) => + listening(stats.initializePeerIfNeeded(e.remoteNodeId), channels.updateChannel(e)) + case WrappedAvailableBalanceChanged(e) => + listening(stats, channels.updateChannel(e)) + case WrappedLocalChannelDown(e) => + val channels1 = channels.removeChannel(e) + val stats1 = if (channels1.getChannels(e.remoteNodeId).isEmpty && !channels1.hasPendingChannel(e.remoteNodeId)) { + stats.removePeer(e.remoteNodeId) + } else { + stats + } + listening(stats1, channels1) + case RemoveOldBuckets => + listening(stats.removeOldBuckets(TimestampMilli.now()), channels) + case GetLatestStats(replyTo) => + // TODO: do a db.listConfirmed() to update on-chain stats (we cannot rely on events only because data comes from + // the TransactionPublished event, but should only be applied after TransactionConfirmed so we need permanent + // storage). We'll need the listConfirmed() function added in https://github.com/ACINQ/eclair/pull/3245. + log.debug("statistics available for {} peers", stats.peers.size) + val now = TimestampMilli.now() + val latest = stats.peers + // We only return statistics for peers with whom we have channels available for payments. + .filter(nodeId => channels.hasChannels(nodeId)) + .map(nodeId => PeerInfo(nodeId, stats.getPeerStats(nodeId, now), channels.getChannels(nodeId), channels.getUpdate(nodeId), channels.hasPendingChannel(nodeId))) + .toSeq + replyTo ! LatestStats(latest) + listening(stats, channels) + } + } + +} diff --git a/eclair-core/src/test/resources/logback-test.xml b/eclair-core/src/test/resources/logback-test.xml index 236be1d4af..0b946139b1 100644 --- a/eclair-core/src/test/resources/logback-test.xml +++ b/eclair-core/src/test/resources/logback-test.xml @@ -55,6 +55,7 @@ + diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 3a43f342cb..226f28dedc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -29,6 +29,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.offer.OffersConfig import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.profit.PeerScorer import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Graph.{HeuristicsConstants, MessageWeightRatios} import fr.acinq.eclair.router.Router._ @@ -261,6 +262,29 @@ object TestConstants { peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour), + peerScoringConfig = PeerScorer.Config( + enabled = true, + scoringFrequency = 1 day, + topPeersCount = 10, + topPeersWhitelist = Set.empty, + liquidity = PeerScorer.LiquidityConfig( + autoFund = true, + autoClose = true, + minFundingAmount = 1_000_000 sat, // 0.01 BTC + maxFundingAmount = 100_000_000 sat, // 1 BTC + minOnChainBalance = 5_000_000 sat, // 0.05 BTC + maxFeerate = FeeratePerByte(100 sat).perKw, + maxFundingTxPerDay = 100, + fundingCooldown = 72 hours, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = true, + minRelayFees = RelayFees(1 msat, 500), + maxRelayFees = RelayFees(10_000 msat, 5000), + dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC + dailyPaymentVolumeThresholdPercent = 0.1, + ) + ), offersConfig = OffersConfig(messagePathMinLength = 2, paymentPathCount = 2, paymentPathLength = 4, paymentPathCltvExpiryDelta = CltvExpiryDelta(500)), ) @@ -453,6 +477,29 @@ object TestConstants { peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour), + peerScoringConfig = PeerScorer.Config( + enabled = true, + scoringFrequency = 1 day, + topPeersCount = 10, + topPeersWhitelist = Set.empty, + liquidity = PeerScorer.LiquidityConfig( + autoFund = true, + autoClose = true, + minFundingAmount = 1_000_000 sat, // 0.01 BTC + maxFundingAmount = 100_000_000 sat, // 1 BTC + minOnChainBalance = 5_000_000 sat, // 0.05 BTC + maxFeerate = FeeratePerByte(100 sat).perKw, + maxFundingTxPerDay = 100, + fundingCooldown = 72 hours, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = true, + minRelayFees = RelayFees(1 msat, 500), + maxRelayFees = RelayFees(10_000 msat, 5000), + dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC + dailyPaymentVolumeThresholdPercent = 0.1, + ) + ), offersConfig = OffersConfig(messagePathMinLength = 2, paymentPathCount = 2, paymentPathLength = 4, paymentPathCltvExpiryDelta = CltvExpiryDelta(500)), ) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala index 1ee99dbb26..0df41cf417 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala @@ -250,4 +250,13 @@ object DummyOnChainWallet { MakeFundingTxResponse(fundingTx, 0, 420 sat) } +} + +class DummyBalanceChecker(var confirmedBalance: Satoshi = 0 sat, var unconfirmedBalance: Satoshi = 0 sat) extends OnChainBalanceChecker { + def setBalance(confirmed: Satoshi, unconfirmed: Satoshi): Unit = { + confirmedBalance = confirmed + unconfirmedBalance = unconfirmed + } + + override def onChainBalance()(implicit ec: ExecutionContext): Future[OnChainBalance] = Future.successful(OnChainBalance(confirmedBalance, unconfirmedBalance)) } \ No newline at end of file diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala new file mode 100644 index 0000000000..d043c9b5c6 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala @@ -0,0 +1,759 @@ +package fr.acinq.eclair.profit + +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, SatoshiLong} +import fr.acinq.eclair.blockchain.DummyBalanceChecker +import fr.acinq.eclair.blockchain.fee.FeeratePerByte +import fr.acinq.eclair.channel.{CMD_CLOSE, CMD_UPDATE_RELAY_FEE, Register} +import fr.acinq.eclair.io.Peer.OpenChannel +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.profit.PeerScorer._ +import fr.acinq.eclair.profit.PeerStatsTracker._ +import fr.acinq.eclair.wire.protocol.ChannelUpdate +import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags} +import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, TestConstants, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32} +import org.scalatest.Inside.inside +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.HexStringSyntax + +import scala.concurrent.duration.DurationInt +import scala.util.Random + +class PeerScorerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e") + private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28") + private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96") + + private val weeklyBuckets = Bucket.bucketsPerDay * 7 + private val defaultConfig = Config( + enabled = true, + // We set this to 1 day to more easily match daily rate-limits. + scoringFrequency = 1 day, + topPeersCount = 10, + topPeersWhitelist = Set.empty, + liquidity = PeerScorer.LiquidityConfig( + autoFund = true, + autoClose = true, + minFundingAmount = 1_000_000 sat, // 0.01 BTC + maxFundingAmount = 100_000_000 sat, // 1 BTC + minOnChainBalance = 5_000_000 sat, // 0.05 BTC + maxFeerate = FeeratePerByte(100 sat).perKw, + maxFundingTxPerDay = 100, + fundingCooldown = 72 hours, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = true, + minRelayFees = RelayFees(1 msat, 500), + maxRelayFees = RelayFees(10_000 msat, 5000), + dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC + dailyPaymentVolumeThresholdPercent = 0.1, + ) + ) + + private case class Fixture(tracker: TestProbe[GetLatestStats], register: TestProbe[Any], wallet: DummyBalanceChecker, scorer: ActorRef[PeerScorer.Command]) + + private def withFixture(config: Config = defaultConfig, onChainBalance: BtcAmount = 0 sat)(testFun: Fixture => Any): Unit = { + val tracker = TestProbe[GetLatestStats]() + val register = TestProbe[Any]() + val wallet = new DummyBalanceChecker(confirmedBalance = onChainBalance.toMilliSatoshi.truncateToSatoshi) + val nodeParams = TestConstants.Alice.nodeParams.copy(peerScoringConfig = config) + val scorer = testKit.spawn(PeerScorer(nodeParams, wallet, tracker.ref, register.ref.toClassic)) + testFun(Fixture(tracker, register, wallet, scorer)) + } + + private def channelUpdate(capacity: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = { + val messageFlags = MessageFlags(dontForward = !announceChannel) + val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true) + ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi) + } + + private def peerStats(totalAmountIn: BtcAmount = 0 sat, + totalAmountOut: BtcAmount = 0 sat, + relayFeeEarned: BtcAmount = 0 sat, + onChainFeePaid: BtcAmount = 0 sat, + liquidityFeeEarned: BtcAmount = 0 sat, + liquidityFeePaid: BtcAmount = 0 sat): PeerStats = PeerStats( + totalAmountIn = totalAmountIn.toMilliSatoshi, + totalAmountOut = totalAmountOut.toMilliSatoshi, + relayFeeEarned = relayFeeEarned.toMilliSatoshi, + onChainFeePaid = onChainFeePaid.toMilliSatoshi.truncateToSatoshi, + liquidityFeeEarned = liquidityFeeEarned.toMilliSatoshi.truncateToSatoshi, + liquidityFeePaid = liquidityFeePaid.toMilliSatoshi.truncateToSatoshi + ) + + private def channelInfo(canSend: BtcAmount, canReceive: BtcAmount, channelId: ByteVector32 = randomBytes32(), isPublic: Boolean = true): ChannelInfo = ChannelInfo( + channelId = channelId, + capacity = canSend.toMilliSatoshi.truncateToSatoshi + canReceive.toMilliSatoshi.truncateToSatoshi, + canSend = canSend.toMilliSatoshi, + canReceive = canReceive.toMilliSatoshi, + isPublic = isPublic + ) + + test("simulate large outgoing flows") { + withFixture(onChainBalance = 10 btc) { f => + import f._ + + // We have channels with 3 peers: + // - we have a very large capacity with our first peer + // - we have a smaller capacity with our second peer + // - we have medium capacity with our third peer + val c1a = channelInfo(canSend = 0.5 btc, canReceive = 2.7 btc) + val c1b = channelInfo(canSend = 0.3 btc, canReceive = 3.5 btc, isPublic = false) + val c2a = channelInfo(canSend = 0.03 btc, canReceive = 0.25 btc) + val c2b = channelInfo(canSend = 0.04 btc, canReceive = 0.17 btc) + val c3 = channelInfo(canSend = 0.3 btc, canReceive = 0.7 btc) + + // We need to scale the last bucket of test data based on the current timestamp, otherwise we will overestimate + // the outgoing payment volume of the current bucket and thus always increase relay fees. + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + + // We have a stable, large outgoing flow with our first peer: we should add liquidity and keep our relay fees unchanged. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.51.btc * bucketRatio, relayFeeEarned = 0.01.btc * bucketRatio), + peerStats(totalAmountOut = 0.52 btc, relayFeeEarned = 0.01 btc), + peerStats(totalAmountOut = 0.48 btc, relayFeeEarned = 0.01 btc), + peerStats(totalAmountOut = 0.47 btc, relayFeeEarned = 0.01 btc), + peerStats(totalAmountOut = 0.51 btc, relayFeeEarned = 0.01 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.5.btc + Random.nextInt(500_000).sat, relayFeeEarned = 0.01 btc)), + channels = Seq(c1a, c1b), + latestUpdate_opt = Some(channelUpdate(c1a.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)), + hasPendingChannel = false, + ) + + // We have an increasing outgoing flow with our second peer: we should add liquidity and increase our routing fees. + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.0001.btc * bucketRatio), + peerStats(totalAmountOut = 0.015 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.011 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.0010 btc, relayFeeEarned = 0.0001 btc)), + channels = Seq(c2a, c2b), + latestUpdate_opt = Some(channelUpdate(c2a.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)), + hasPendingChannel = false + ) + + // We have a decreasing outgoing flow with our third peer: we should decrease our routing fees and may add liquidity. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00007.btc * bucketRatio), + peerStats(totalAmountOut = 0.03 btc, relayFeeEarned = 0.00009 btc), + peerStats(totalAmountOut = 0.07 btc, relayFeeEarned = 0.00012 btc), + peerStats(totalAmountOut = 0.07 btc, relayFeeEarned = 0.00011 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(c3), + latestUpdate_opt = Some(channelUpdate(c3.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + + // We increase our relay fees with our second peer, on all channels. + val feeIncreases = Seq( + register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]], + register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]], + ) + assert(feeIncreases.map(_.channelId).toSet == Set(c2a.channelId, c2b.channelId)) + assert(feeIncreases.map(_.message.feeProportionalMillionths).toSet == Set(1500)) + // We decrease our relay fees with our third peer. + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.channelId == c3.channelId) + assert(cmd.message.feeProportionalMillionths == 500) + } + // We fund channels with all of our peers. + val funding = Seq( + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + ) + assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2, remoteNodeId3)) + funding.filter(_.nodeId != remoteNodeId1).foreach(f => assert(f.message.fundingAmount < defaultConfig.liquidity.maxFundingAmount)) + funding.filter(_.nodeId == remoteNodeId1).foreach(f => assert(f.message.fundingAmount == defaultConfig.liquidity.maxFundingAmount)) + register.expectNoMessage(100 millis) + } + } + + test("simulate exhausted large outgoing flow") { + withFixture(onChainBalance = 10 btc) { f => + import f._ + + // Our first peer was very profitable, but it has exhausted its liquidity more than a week ago and doesn't have recent routing activity. + // But since it has a large capacity, we still fund a channel to try to revive it. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(Bucket.bucketsPerDay * 7)(PeerStats.empty), + channels = Seq(channelInfo(canSend = 10_000 sat, canReceive = 3 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our second peer was very profitable as well, but has exhausted its liquidity more recently. + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(Bucket.bucketsPerDay * 3)(PeerStats.empty) ++ Seq.fill(Bucket.bucketsPerDay * 4)(peerStats(totalAmountOut = 0.3 btc, relayFeeEarned = 0.0005 btc)), + channels = Seq(channelInfo(canSend = 10_000 sat, canReceive = 3 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our third peer has balanced flows. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(Bucket.bucketsPerDay * 7)(peerStats(totalAmountOut = 0.2 btc, totalAmountIn = 0.2 btc, relayFeeEarned = 0.001 btc)), + channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 2.5 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + val funding = Seq( + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + ) + assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + // We fund a smaller channel with the first node, because we don't know how it will perform. + funding.filter(_.nodeId == remoteNodeId1).foreach(f => assert(f.message.fundingAmount < defaultConfig.liquidity.maxFundingAmount)) + funding.filter(_.nodeId == remoteNodeId2).foreach(f => assert(f.message.fundingAmount == defaultConfig.liquidity.maxFundingAmount)) + register.expectNoMessage(100 millis) + } + } + + test("simulate good small peers that need more capacity") { + // We use a configuration that selects only our best peer: we should still select a smaller peer that performs well. + // We set a large scoring frequency to remove the random rate-limit (of 1 small peer funding every 3 days). + val config = defaultConfig.copy(topPeersCount = 1, scoringFrequency = 7 days) + withFixture(onChainBalance = 10 btc, config = config) { f => + import f._ + + // Our first peer has the biggest outgoing payment volume and capacity and needs more liquidity. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.001 btc, totalAmountOut = 0.002 btc, relayFeeEarned = 10_000 sat)), + channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 3.5 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our second peer, which has a smaller capacity, has a better per-capacity profit ratio. + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.0005 btc, totalAmountOut = 0.001 btc, relayFeeEarned = 5_000 sat)), + channels = Seq(channelInfo(canSend = 0.01 btc, canReceive = 0.07 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our third peer also has a good per-capacity profit ratio, but doesn't need any liquidity. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.0005 btc, totalAmountOut = 0.001 btc, relayFeeEarned = 5_000 sat)), + channels = Seq(channelInfo(canSend = 0.04 btc, canReceive = 0.04 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + val funding = Seq( + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + ) + assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + // We increase small peers' capacity by 50%. + funding.find(_.nodeId == remoteNodeId2).foreach(cmd => assert(cmd.message.fundingAmount == 0.04.btc.toSatoshi)) + register.expectNoMessage(100 millis) + } + } + + test("fund channels with whitelisted peers") { + val config = defaultConfig.copy(topPeersWhitelist = Set(remoteNodeId3)) + withFixture(onChainBalance = 10 btc, config = config) { f => + import f._ + + // Our first peer isn't very profitable and doesn't need liquidity (small outgoing flow). + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 250_000 sat, totalAmountOut = 300_000 sat, relayFeeEarned = 20_000 sat)), + channels = Seq(channelInfo(canSend = 0.4 btc, canReceive = 0.5 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our second peer isn't very profitable either and doesn't need liquidity (small incoming flow). + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 300_000 sat, totalAmountOut = 280_000 sat, relayFeeEarned = 15_000 sat)), + channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 0.2 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our third peer has exhausted its liquidity a while ago and lost most of its capacity: but it is whitelisted, + // so we should fund a new channel with them. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(weeklyBuckets)(PeerStats.empty), + channels = Seq(channelInfo(canSend = 150_000 sat, canReceive = 1_400_000 sat)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId3) + assert(cmd.message.fundingAmount == defaultConfig.liquidity.minFundingAmount) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund channels if channel is already being created") { + withFixture(onChainBalance = 5 btc) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // If there are no pending channel, we suggest adding liquidity. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId1) + assert(cmd.message.fundingAmount > 0.01.btc) + } + + // If there is already a pending channel, we don't suggest adding liquidity. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = true))) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund channels if feerate is too high") { + val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(maxFeerate = FeeratePerByte(1 sat).perKw)) + withFixture(onChainBalance = 5 btc, config = config) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // But the feerate is too high compared to our configured threshold, so we don't do anything. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund channels if on-chain balance is too low") { + val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(minOnChainBalance = 1.5 btc)) + withFixture(onChainBalance = 1.5 btc, config = config) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // But our balance is too low compared to our configured threshold, so we don't do anything. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + register.expectNoMessage(100 millis) + } + } + + test("don't update relay fees too frequently") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc) + val latestUpdate = channelUpdate(channel.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // We have an increasing outgoing flow with our peer: we should increase our routing fees. + val peerInfo = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc, relayFeeEarned = 0.00015.btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + // We increase our routing fees. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.channelId == channel.channelId) + assert(cmd.message.feeProportionalMillionths > latestUpdate.feeProportionalMillionths) + } + register.expectNoMessage(100 millis) + + // However, if our latest update was done recently, we don't update our routing fees. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo.copy(latestUpdate_opt = Some(latestUpdate.copy(timestamp = TimestampSecond.now() - Bucket.duration))))) + } + register.expectNoMessage(100 millis) + } + } + + test("close idle channels with liquidity") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + // We have several old channels where liquidity is on our side. + val c1a = channelInfo(canSend = 0.5 btc, canReceive = 0.01 btc) + val c1b = channelInfo(canSend = 0.8 btc, canReceive = 0.1 btc, isPublic = false) + val c1c = channelInfo(canSend = 0.4 btc, canReceive = 0.05 btc) + val c2a = channelInfo(canSend = 0.3 btc, canReceive = 0.1 btc) + val c2b = channelInfo(canSend = 0.2 btc, canReceive = 0.01 btc) + val c3 = channelInfo(canSend = 0.8 btc, canReceive = 0.05 btc) + + // The outgoing volume of the corresponding peers is negligible. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 10_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 100 sat)), + channels = Seq(c1a, c1b, c1c), + latestUpdate_opt = None, + hasPendingChannel = false + ) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 5_000 sat, totalAmountIn = 8_000 sat, relayFeeEarned = 80 sat)), + channels = Seq(c2a, c2b), + latestUpdate_opt = None, + hasPendingChannel = false + ) + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 5_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 50 sat)), + channels = Seq(c3), + latestUpdate_opt = None, + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + // We want to keep one channel with each peer, and select the one that likely has the best score for path-finding algorithms. + assert(Seq( + register.expectMessageType[Register.Forward[CMD_CLOSE]], + register.expectMessageType[Register.Forward[CMD_CLOSE]], + register.expectMessageType[Register.Forward[CMD_CLOSE]] + ).map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId, c2b.channelId)) + register.expectNoMessage(100 millis) + } + } + + test("don't close idle channels if feerate is too high") { + val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(maxFeerate = FeeratePerByte(1 sat).perKw)) + withFixture(onChainBalance = 0 btc, config = config) { f => + import f._ + + // We have several old channels where liquidity is on our side. + val c1a = channelInfo(canSend = 0.5 btc, canReceive = 0.01 btc) + val c1b = channelInfo(canSend = 0.8 btc, canReceive = 0.1 btc) + val c1c = channelInfo(canSend = 0.4 btc, canReceive = 0.05 btc) + + // The outgoing volume of the corresponding peer is negligible. + val peerInfo = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 10_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 100 sat)), + channels = Seq(c1a, c1b, c1c), + latestUpdate_opt = None, + hasPendingChannel = false + ) + + // But the feerate is too high compared to our configured threshold, so we don't close channels. + assert(TestConstants.Alice.nodeParams.currentFeeratesForFundingClosing.medium > config.liquidity.maxFeerate) + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo)) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund the same peer within cooldown period") { + withFixture(onChainBalance = 10 btc) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // First scoring cycle: we fund the peer. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId1) + } + register.expectNoMessage(100 millis) + + // Second scoring cycle with the same volume: we don't fund again because the cooldown is active and volume hasn't improved. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + register.expectNoMessage(100 millis) + + // Third scoring cycle: volume improved by more than 10%, so we fund again despite cooldown. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + val improvedStats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1.2 btc, relayFeeEarned = 0.012 btc)) + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, improvedStats, Seq(channel), None, hasPendingChannel = false))) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId1) + } + register.expectNoMessage(100 millis) + } + } + + test("revert fee increase when volume drops") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc) + val initialFee = RelayFees(250 msat, 1000) + val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // First cycle: increasing volume triggers a fee increase. + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00015.btc * bucketRatio), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1500) + } + register.expectNoMessage(100 millis) + + // Second cycle: the fee increase is now in effect (channel update reflects new fee), but volume has dropped >20%. + // The update timestamp is within the evaluation window (6h-24h ago). + val newUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + // Volume has dropped significantly (>20% of what it was when we increased fees). + peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(newUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo2)) + } + // The fee should be reverted to the original value. + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == initialFee.feeProportionalMillionths) + } + register.expectNoMessage(100 millis) + } + } + + test("revert fee decrease when volume doesn't recover") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.5 btc, canReceive = 0.5 btc) + val initialFee = RelayFees(250 msat, 2000) + val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // First cycle: decreasing volume triggers a fee decrease. + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.003.btc * bucketRatio, relayFeeEarned = 0.00003.btc * bucketRatio), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00003 btc), + peerStats(totalAmountOut = 0.01 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.00012 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.00012 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1500) + } + register.expectNoMessage(100 millis) + + // Second cycle: the fee decrease is in effect but volume hasn't recovered (below 90% of what it was). + val newUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + // Volume is still very low (well below 90% of dailyVolumeOutAtChange). + peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(newUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo2)) + } + // The fee should be reverted to the original value. + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == initialFee.feeProportionalMillionths) + } + register.expectNoMessage(100 millis) + } + } + + test("fee reverts should not oscillate") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc) + val initialFee = RelayFees(250 msat, 1000) + val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // Cycle 1: increasing volume triggers a fee increase (1000 -> 1500). + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00015.btc * bucketRatio), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1500) + } + register.expectNoMessage(100 millis) + + // Cycle 2: volume dropped >20% after the increase: we revert back to 1000. + val increasedFeeUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(increasedFeeUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo2)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1000) // reverted + } + register.expectNoMessage(100 millis) + + // Cycle 3: the revert is now in effect, but volume continues to decline. The revert recorded a FeeDecrease in + // the history (from 1500 back to 1000). If the code treats this revert as a regular fee decrease decision, the + // revert logic would trigger again (volume < 90% of what it was at revert time) and flip the fee back to 1500, + // creating an oscillation: 1000 -> 1500 -> 1000 -> 1500 -> ... + // A revert should be final and not subject to further revert evaluation. + val revertedFeeUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1000), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.0005.btc * bucketRatio, relayFeeEarned = 0.000005.btc * bucketRatio), + peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc), + peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc), + peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(revertedFeeUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo3)) + } + // The fee should NOT change again: the revert was a correction, not a new decision to evaluate. + register.expectNoMessage(100 millis) + } + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala new file mode 100644 index 0000000000..c4dea6f884 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala @@ -0,0 +1,339 @@ +package fr.acinq.eclair.profit + +import akka.actor.ActorRef +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} +import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, OutPoint, SatoshiLong, Script, Transaction, TxOut} +import fr.acinq.eclair.TestUtils.randomTxId +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.payment.PaymentEvent.{IncomingPayment, OutgoingPayment} +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.payment.{ChannelPaymentRelayed, TrampolinePaymentRelayed} +import fr.acinq.eclair.profit.PeerStatsTracker._ +import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo} +import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} +import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiryDelta, Features, MilliSatoshiLong, RealShortChannelId, TestDatabases, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32, randomKey} +import org.scalatest.Inside.inside +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.{ByteVector, HexStringSyntax} + +import scala.concurrent.duration.DurationInt + +class PeerStatsTrackerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + private val dummyPubKey = PrivateKey(ByteVector32.One).publicKey + private val dummyAliases = ShortIdAliases(Alias(42), None) + private val dummyChannelAnn = ChannelAnnouncement(ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, Features.empty, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), dummyPubKey, dummyPubKey, dummyPubKey, dummyPubKey) + + private val localNodeId = PublicKey(hex"03bd04635f1465d9347f3d69edc51f17cdf9548847533e084cd9d153a4abb065cd") + private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e") + private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28") + private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96") + + private def commitments(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, announceChannel: Boolean = true): Commitments = { + CommitmentsSpec.makeCommitments(toLocal.toMilliSatoshi, toRemote.toMilliSatoshi, localNodeId, remoteNodeId, if (announceChannel) Some(dummyChannelAnn) else None) + } + + private def updateChannelBalance(c: Commitments, toLocal: BtcAmount, toRemote: BtcAmount): Commitments = { + val c1 = commitments(c.remoteNodeId, toLocal, toRemote, c.announceChannel) + c1.copy(channelParams = c1.channelParams.copy(channelId = c.channelId)) + } + + private def channelUpdate(capacity: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = { + val messageFlags = MessageFlags(dontForward = !announceChannel) + val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true) + ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi) + } + + private def channel(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), announceChannel: Boolean = true): DATA_NORMAL = { + val c = commitments(remoteNodeId, toLocal, toRemote, announceChannel) + val ann_opt = if (announceChannel) Some(dummyChannelAnn) else None + val update = channelUpdate(c.capacity, fees, TimestampSecond.now(), announceChannel) + DATA_NORMAL(c, dummyAliases, ann_opt, update, SpliceStatus.NoSplice, None, None, None) + } + + test("create buckets") { + // February 5th 2026 at 12h00 UTC. + val timestamp = TimestampMilli(1770292800000L) + assert(Bucket.from(timestamp) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2)) + assert(Bucket.from(timestamp - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1)) + assert(Bucket.from(timestamp - Bucket.duration) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1)) + assert(Bucket.from(timestamp - Bucket.duration - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 2)) + assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2) == Bucket(day = 5, month = 2, year = 2026, slot = 0)) + assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2 - 1.millis) == Bucket(day = 4, month = 2, year = 2026, slot = Bucket.bucketsPerDay - 1)) + } + + test("sort buckets") { + val b1 = Bucket(day = 30, month = 11, year = 2025, slot = 0) + val b2 = Bucket(day = 30, month = 11, year = 2025, slot = 7) + val b3 = Bucket(day = 30, month = 11, year = 2025, slot = 9) + val b4 = Bucket(day = 1, month = 12, year = 2025, slot = 2) + val b5 = Bucket(day = 1, month = 12, year = 2025, slot = 3) + val b6 = Bucket(day = 15, month = 12, year = 2025, slot = 5) + val b7 = Bucket(day = 1, month = 1, year = 2026, slot = 1) + assert(b1 < b2 && b2 < b3 && b3 < b4 && b4 < b5 && b5 < b6 && b6 < b7) + assert(Seq(b3, b6, b5, b1, b4, b2, b7).sorted == Seq(b1, b2, b3, b4, b5, b6, b7)) + } + + test("evaluate consumed bucket ratio") { + // February 5th 2026 at 12h00 UTC. + val timestamp = TimestampMilli(1770292800000L) + assert(Bucket.consumed(timestamp) == 0.00) + assert(Bucket.consumed(timestamp + Bucket.duration / 3) == 1.0 / 3) + assert(Bucket.consumed(timestamp + Bucket.duration / 2) == 0.5) + assert(Bucket.consumed(timestamp + Bucket.duration * 2 / 3) == 2.0 / 3) + assert(Bucket.consumed(timestamp - 10.seconds) >= 0.99) + assert(Bucket.consumed(timestamp - 10.seconds) < 1.0) + } + + test("keep track of channel balances and state") { + val now = TimestampMilli.now() + val probe = TestProbe[LatestStats]() + + // We have 4 channels with our first peer: 2 of them are active. + val c1a = DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), dummyAliases) + val c1b = channel(remoteNodeId1, toLocal = 0.15 btc, toRemote = 0.25 btc) + val c1c = channel(remoteNodeId1, toLocal = 0.07 btc, toRemote = 0.03 btc) + val c1d = DATA_SHUTDOWN(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), null, null, CloseStatus.Initiator(None)) + // We have 2 channels with our second peer: none of them are active. + val c2a = DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(remoteNodeId2, toLocal = 0.13 btc, toRemote = 0.24 btc), BlockHeight(750_000), None, null) + val c2b = DATA_NEGOTIATING_SIMPLE(commitments(remoteNodeId2, toLocal = 0.5 btc, toRemote = 0.1 btc), FeeratePerKw(2000 sat), ByteVector.empty, ByteVector.empty, Nil, Nil) + // We have 2 channels with our third peer: none of them are active. + val c3a = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.2 btc), 0 msat, 0 msat, BlockHeight(750_000), BlockHeight(750_000), DualFundingStatus.WaitingForConfirmations, None) + val c3b = DATA_CLOSING(commitments(remoteNodeId3, toLocal = 0.2 btc, toRemote = 0.2 btc), BlockHeight(750_000), ByteVector.empty, Nil, ClosingTx(InputInfo(OutPoint(randomTxId(), 2), TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey))), Transaction(2, Nil, TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey)) :: Nil, 0), None) :: Nil) + + // We initialize our actor with these existing channels. + val tracker = testKit.spawn(PeerStatsTracker(TestDatabases.inMemoryDb().audit, Seq(c1a, c1b, c1c, c1d, c2a, c2b, c3a, c3b))) + // We have relayed payments through channels that have been closed since then. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 58_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c3b.channelId, remoteNodeId3, 50_000_000 msat, now)) + )) + // We have relayed payments through active channels as well. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c2b.channelId, remoteNodeId2, 15_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now)) + )) + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + // We only have active channels with our first peer. + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1)) + // We only take into account the two active channels with our first peer. + val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId)) + assert(peer1.latestUpdate_opt.exists(u => Set(c1b.channelUpdate, c1c.channelUpdate).contains(u))) + assert(peer1.capacity == 0.5.btc.toSatoshi) + assert(peer1.hasPendingChannel) + assert(0.21.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.22.btc.toMilliSatoshi) + assert(0.27.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.28.btc.toMilliSatoshi) + } + + // Our pending channel with our second peer becomes ready. + val update2a = channelUpdate(c2a.commitments.capacity, RelayFees(500 msat, 500)) + tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c2a.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2, + announcement_opt = None, + channelUpdate = update2a, + commitments = c2a.commitments, + )) + + // Our pending channel with our third peer is aborted. + tracker.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c3a.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId3 + )) + + // We forget about our third peer, with whom we don't have pending or active channels anymore. + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + assert(!s.peers.find(_.remoteNodeId == remoteNodeId2).get.hasPendingChannel) + assert(s.peers.find(_.remoteNodeId == remoteNodeId2).get.latestUpdate_opt.contains(update2a)) + } + + // We relay a payment to our second peer. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 30_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c2a.channelId, remoteNodeId2, 15_000_000 msat, now)) + )) + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + val peer2 = s.peers.find(_.remoteNodeId == remoteNodeId2).get + assert(peer2.channels.map(_.channelId).toSet == Set(c2a.channelId)) + assert(peer2.latestUpdate_opt.contains(update2a)) + } + + // We update our routing fees with our first peer. + val update1b = channelUpdate(c1b.commitments.capacity, RelayFees(100 msat, 600), TimestampSecond.now() + 10.seconds) + tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c1b.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId1, + announcement_opt = None, + channelUpdate = update1b, + commitments = updateChannelBalance(c1b.commitments, toLocal = 0.3 btc, toRemote = 0.1 btc), + )) + + // We ignore previous channel updates from another channel. + val update1c = channelUpdate(c1c.commitments.capacity, RelayFees(150 msat, 400), update1b.timestamp - 10.seconds) + tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c1c.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId1, + announcement_opt = None, + channelUpdate = update1c, + commitments = updateChannelBalance(c1c.commitments, toLocal = 0.05 btc, toRemote = 0.05 btc), + )) + + // Channels with our second peer are closed. + tracker.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c2a.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2 + )) + tracker.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c2b.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2 + )) + + // The only remaining channels are with our first peer, with updated balances and the latest channel update. + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1)) + val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId)) + assert(peer1.capacity == 0.5.btc.toSatoshi) + assert(0.34.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.35.btc.toMilliSatoshi) + assert(0.14.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.15.btc.toMilliSatoshi) + assert(peer1.latestUpdate_opt.contains(update1b)) + } + } + + test("keep track of peer statistics") { + val now = TimestampMilli.now() + val probe = TestProbe[LatestStats]() + val tracker = testKit.spawn(PeerStatsTracker(TestDatabases.inMemoryDb().audit, Nil)) + + // We have channels with 3 peers: + val c1a = commitments(remoteNodeId1, toLocal = 0.5 btc, toRemote = 0.3 btc) + val c1b = commitments(remoteNodeId1, toLocal = 0.4 btc, toRemote = 0.2 btc, announceChannel = false) + val c2 = commitments(remoteNodeId2, toLocal = 0.01 btc, toRemote = 0.9 btc) + val c3 = commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.1 btc) + Seq(c1a, c1b, c2, c3).foreach(c => tracker.ref ! WrappedAvailableBalanceChanged(AvailableBalanceChanged( + channel = ActorRef.noSender, + channelId = c.channelId, + aliases = dummyAliases, + commitments = c, + lastAnnouncement_opt = None, + ))) + + // We have relayed some payments with all of those peers. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1a.channelId, remoteNodeId1, 30_000_000 msat, now - Bucket.duration * 2)), + outgoing = Seq(OutgoingPayment(c2.channelId, remoteNodeId2, 20_000_000 msat, now - Bucket.duration * 2)) + )) + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c2.channelId, remoteNodeId2, 10_000_000 msat, now - Bucket.duration * 2)), + outgoing = Seq(OutgoingPayment(c3.channelId, remoteNodeId3, 9_000_000 msat, now - Bucket.duration * 2)) + )) + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq( + IncomingPayment(c2.channelId, remoteNodeId2, 30_000_000 msat, now - Bucket.duration), + IncomingPayment(c3.channelId, remoteNodeId3, 25_000_000 msat, now - Bucket.duration), + ), + outgoing = Seq( + OutgoingPayment(c1a.channelId, remoteNodeId1, 50_000_000 msat, now - Bucket.duration), + ) + )) + tracker.ref ! WrappedPaymentRelayed(TrampolinePaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq( + IncomingPayment(c2.channelId, remoteNodeId2, 21_000_000 msat, now), + IncomingPayment(c2.channelId, remoteNodeId2, 34_000_000 msat, now), + ), + outgoing = Seq( + OutgoingPayment(c3.channelId, remoteNodeId3, 22_000_000 msat, now), + OutgoingPayment(c3.channelId, remoteNodeId3, 18_000_000 msat, now), + OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now), + ), + nextTrampolineNodeId = randomKey().publicKey, + nextTrampolineAmount = 50_000_000 msat, + )) + + // We keep track of aggregated statistics per bucket. + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2, remoteNodeId3)) + assert(s.peers.flatMap(_.stats.map(_.profit)).sum == 21_000_000.msat) + // We only have routing activity in the past 3 buckets. + s.peers.foreach(p => assert(p.stats.drop(3).forall(olderStats => olderStats == PeerStats.empty))) + // We verify that routing activity is correctly recorded in the right bucket. + val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.capacity == 1.4.btc.toSatoshi) + assert(peer1.canSend == c1a.availableBalanceForSend + c1b.availableBalanceForSend) + assert(peer1.canReceive == c1a.availableBalanceForReceive + c1b.availableBalanceForReceive) + assert(peer1.stats.head.totalAmountIn == 0.msat) + assert(peer1.stats.head.totalAmountOut == 10_000_000.msat) + assert(peer1.stats.head.relayFeeEarned == 1_000_000.msat) + assert(peer1.stats(1).totalAmountIn == 0.msat) + assert(peer1.stats(1).totalAmountOut == 50_000_000.msat) + assert(peer1.stats(1).relayFeeEarned == 5_000_000.msat) + assert(peer1.stats(2).totalAmountIn == 30_000_000.msat) + assert(peer1.stats(2).totalAmountOut == 0.msat) + assert(peer1.stats(2).relayFeeEarned == 0.msat) + assert(peer1.stats.map(_.outgoingFlow).sum == 30_000_000.msat) + val peer2 = s.peers.find(_.remoteNodeId == remoteNodeId2).get + assert(peer2.capacity == 0.91.btc.toSatoshi) + assert(peer2.canSend == c2.availableBalanceForSend) + assert(peer2.canReceive == c2.availableBalanceForReceive) + assert(peer2.stats.head.totalAmountIn == 55_000_000.msat) + assert(peer2.stats.head.totalAmountOut == 0.msat) + assert(peer2.stats.head.relayFeeEarned == 0.msat) + assert(peer2.stats(1).totalAmountIn == 30_000_000.msat) + assert(peer2.stats(1).totalAmountOut == 0.msat) + assert(peer2.stats(1).relayFeeEarned == 0.msat) + assert(peer2.stats(2).totalAmountIn == 10_000_000.msat) + assert(peer2.stats(2).totalAmountOut == 20_000_000.msat) + assert(peer2.stats(2).relayFeeEarned == 10_000_000.msat) + assert(peer2.stats.map(_.outgoingFlow).sum == -75_000_000.msat) + val peer3 = s.peers.find(_.remoteNodeId == remoteNodeId3).get + assert(peer3.capacity == 0.8.btc.toSatoshi) + assert(peer3.canSend == c3.availableBalanceForSend) + assert(peer3.canReceive == c3.availableBalanceForReceive) + assert(peer3.stats.head.totalAmountIn == 0.msat) + assert(peer3.stats.head.totalAmountOut == 40_000_000.msat) + assert(peer3.stats.head.relayFeeEarned == 4_000_000.msat) + assert(peer3.stats(1).totalAmountIn == 25_000_000.msat) + assert(peer3.stats(1).totalAmountOut == 0.msat) + assert(peer3.stats(1).relayFeeEarned == 0.msat) + assert(peer3.stats(2).totalAmountIn == 0.msat) + assert(peer3.stats(2).totalAmountOut == 9_000_000.msat) + assert(peer3.stats(2).relayFeeEarned == 1_000_000.msat) + assert(peer3.stats.map(_.outgoingFlow).sum == 24_000_000.msat) + } + } + +}