From 6d8e71d68f948f18a044f82528b0c3778a3dbe76 Mon Sep 17 00:00:00 2001 From: t-bast Date: Mon, 9 Feb 2026 16:07:12 +0100 Subject: [PATCH 1/2] Add per-peer profit scoring We create a new set of actors that keep track of payment statistics across our peers and rank them to identify the top profit earners. Based on those statistics, the actors issue recommendations to: - allocate more liquidity towards nodes that are generating revenue and may run out of liquidity in the next few days - reclaim liquidity from inactive channels - change our relay fees to optimize increases or decreases in outgoing flow and volume --- eclair-core/src/main/resources/reference.conf | 45 ++ .../scala/fr/acinq/eclair/MilliSatoshi.scala | 1 + .../scala/fr/acinq/eclair/NodeParams.scala | 30 + .../main/scala/fr/acinq/eclair/Setup.scala | 6 +- .../scala/fr/acinq/eclair/Timestamp.scala | 1 + .../fr/acinq/eclair/profit/PeerScorer.scala | 382 +++++++++++++ .../eclair/profit/PeerStatsTracker.scala | 380 +++++++++++++ .../src/test/resources/logback-test.xml | 1 + .../scala/fr/acinq/eclair/TestConstants.scala | 45 ++ .../blockchain/DummyOnChainWallet.scala | 9 + .../acinq/eclair/profit/PeerScorerSpec.scala | 513 ++++++++++++++++++ .../eclair/profit/PeerStatsTrackerSpec.scala | 339 ++++++++++++ 12 files changed, 1751 insertions(+), 1 deletion(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 2e456cfb5d..f0b5056302 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -656,6 +656,51 @@ eclair { cleanup-frequency = 1 day } + // We keep track of peer statistics to find the most profitable peers and fund channels with them to optimize our + // payment volume and fees earned. + peer-scoring { + // Set this to true if you want to start collecting data to score peers. + enabled = false + // Frequency at which we run our peer scoring algorithm. + frequency = 1 hour + // Maximum number of peers to select as candidates for liquidity and relay fee updates. + top-peers-count = 10 + // A list of node_ids with whom we will try to maintain liquidity. + top-peers-whitelist = [] + // We can automatically allocate liquidity to our top peers when necessary. + liquidity { + // If true, we will automatically fund channels. + auto-fund = false + // If true, we will automatically close unused channels to reclaim liquidity. + auto-close = false + // We only fund channels if at least this amount is necessary. + min-funding-amount-satoshis = 1000000 // 0.01 btc + // We never fund channels with more than this amount. + max-funding-amount-satoshis = 50000000 // 0.5 btc + // We stop funding channels if our on-chain balance is below this amount. + min-on-chain-balance-satoshis = 50000000 // 0.5 btc + // We stop funding channels if the on-chain feerate is above this value. + max-feerate-sat-per-byte = 5 + // Rate-limit the number of funding transactions we make per day (on average). + max-funding-tx-per-day = 6 + } + // We can automatically update our relay fees to our top peers when necessary. + relay-fees { + // If true, we will automatically update our relay fees based on variations in outgoing payment volume. + auto-update = false + // We will not lower our fees below these values. + min-fee-base-msat = 1 + min-fee-proportional-millionths = 500 + // We will not increase our fees above these values. + max-fee-base-msat = 10000 + max-fee-proportional-millionths = 5000 + // We only increase fees if the daily outgoing payment volume exceeds this threshold or daily-payment-volume-threshold-percent. + daily-payment-volume-threshold-satoshis = 10000000 // 0.1 btc + // We only increase fees if the daily outgoing payment volume exceeds this percentage of our peer capacity or daily-payment-volume-threshold. + daily-payment-volume-threshold-percent = 0.05 + } + } + offers { // Minimum length of an offer blinded path when hiding our real node id message-path-min-length = 2 diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala b/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala index 100829ce83..b82198759c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala @@ -35,6 +35,7 @@ case class MilliSatoshi(private val underlying: Long) extends Ordered[MilliSatos def *(m: Long) = MilliSatoshi(underlying * m) def *(m: Double) = MilliSatoshi((underlying * m).toLong) def /(d: Long) = MilliSatoshi(underlying / d) + def /(d: Double) = MilliSatoshi((underlying / d).toLong) def unary_- = MilliSatoshi(-underlying) override def compare(other: MilliSatoshi): Int = underlying.compareTo(other.underlying) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index be25030aec..a921a24869 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -33,6 +33,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.offer.OffersConfig import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.profit.PeerScorer import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.HeuristicsConstants @@ -95,6 +96,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig, onTheFlyFundingConfig: OnTheFlyFunding.Config, peerStorageConfig: PeerStorageConfig, + peerScoringConfig: PeerScorer.Config, offersConfig: OffersConfig) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey @@ -714,6 +716,34 @@ object NodeParams extends Logging { removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS), ), + peerScoringConfig = PeerScorer.Config( + enabled = config.getBoolean("peer-scoring.enabled"), + scoringFrequency = FiniteDuration(config.getDuration("peer-scoring.frequency").getSeconds, TimeUnit.SECONDS), + topPeersCount = config.getInt("peer-scoring.top-peers-count"), + topPeersWhitelist = config.getStringList("peer-scoring.top-peers-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet, + liquidity = PeerScorer.LiquidityConfig( + autoFund = config.getBoolean("peer-scoring.liquidity.auto-fund"), + autoClose = config.getBoolean("peer-scoring.liquidity.auto-close"), + minFundingAmount = config.getLong("peer-scoring.liquidity.min-funding-amount-satoshis").sat, + maxFundingAmount = config.getLong("peer-scoring.liquidity.max-funding-amount-satoshis").sat, + maxFundingTxPerDay = config.getInt("peer-scoring.liquidity.max-funding-tx-per-day"), + minOnChainBalance = config.getLong("peer-scoring.liquidity.min-on-chain-balance-satoshis").sat, + maxFeerate = FeeratePerByte(config.getLong("peer-scoring.liquidity.max-feerate-sat-per-byte").sat).perKw, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = config.getBoolean("peer-scoring.relay-fees.auto-update"), + minRelayFees = RelayFees( + feeBase = config.getLong("peer-scoring.relay-fees.min-fee-base-msat").msat, + feeProportionalMillionths = config.getLong("peer-scoring.relay-fees.min-fee-proportional-millionths"), + ), + maxRelayFees = RelayFees( + feeBase = config.getLong("peer-scoring.relay-fees.max-fee-base-msat").msat, + feeProportionalMillionths = config.getLong("peer-scoring.relay-fees.max-fee-proportional-millionths"), + ), + dailyPaymentVolumeThreshold = config.getLong("peer-scoring.relay-fees.daily-payment-volume-threshold-satoshis").sat, + dailyPaymentVolumeThresholdPercent = config.getDouble("peer-scoring.relay-fees.daily-payment-volume-threshold-percent"), + ) + ), offersConfig = OffersConfig( messagePathMinLength = config.getInt("offers.message-path-min-length"), paymentPathCount = config.getInt("offers.payment-path-count"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 6d9bc2de5e..e3729e7768 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager} import fr.acinq.eclair.payment.receive.PaymentHandler import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator} +import fr.acinq.eclair.profit.{PeerScorer, PeerStatsTracker} import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router._ import fr.acinq.eclair.tor.{Controller, TorProtocolHandler} @@ -400,8 +401,11 @@ class Setup(val datadir: File, _ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart)) balanceActor = system.spawn(BalanceActor(bitcoinClient, nodeParams.channelConf.minDepth, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor") - postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard, router.toTyped, register, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman") + _ = if (nodeParams.peerScoringConfig.enabled) { + val statsTracker = system.spawn(Behaviors.supervise(PeerStatsTracker(nodeParams.db.audit, channels)).onFailure(typed.SupervisorStrategy.restart), name = "peer-stats-tracker") + system.spawn(Behaviors.supervise(PeerScorer(nodeParams, bitcoinClient, statsTracker, register)).onFailure(typed.SupervisorStrategy.restart), name = "peer-scorer") + } kit = Kit( nodeParams = nodeParams, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala index 1d6a1b4478..9629fa09bd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala @@ -46,6 +46,7 @@ case class TimestampMilli(private val underlying: Long) extends Ordered[Timestam require(underlying >= 0 && underlying <= 253402300799L * 1000, "invalid timestamp value") // @formatter:off def toLong: Long = underlying + def toTimestampSecond: TimestampSecond = TimestampSecond(underlying / 1000) def toSqlTimestamp: sql.Timestamp = sql.Timestamp.from(Instant.ofEpochMilli(underlying)) override def toString: String = s"$underlying unixms" override def compare(that: TimestampMilli): Int = underlying.compareTo(that.underlying) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala new file mode 100644 index 0000000000..4a71fa5201 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala @@ -0,0 +1,382 @@ +/* + * Copyright 2026 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.profit + +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.{ActorRef => UntypedActorRef} +import akka.util.Timeout +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong} +import fr.acinq.eclair.blockchain.OnChainBalanceChecker +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel.{CMD_CLOSE, CMD_UPDATE_RELAY_FEE, ChannelFlags, Register} +import fr.acinq.eclair.io.Peer.OpenChannel +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.profit.PeerStatsTracker._ +import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, NodeParams, TimestampMilli} + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.{Failure, Random, Success} + +/** + * Created by t-bast on 30/01/2026. + */ + +object PeerScorer { + + // @formatter:off + sealed trait Command + case class ScorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]]) extends Command + private case class WrappedLatestStats(peers: Seq[PeerInfo]) extends Command + private case class OnChainBalance(confirmed: Satoshi, unconfirmed: Satoshi) extends Command + private case class WalletError(e: Throwable) extends Command + // @formatter:on + + /** + * @param enabled the [[PeerStatsTracker]] actor should only be created when [[enabled]] is true. + * @param scoringFrequency frequency at which we run our peer scoring algorithm. + * @param topPeersCount maximum number of peers to select as candidates for liquidity and relay fee updates. + * @param topPeersWhitelist a whitelist of peers that the node operator wants to prioritize. + */ + case class Config(enabled: Boolean, scoringFrequency: FiniteDuration, topPeersCount: Int, topPeersWhitelist: Set[PublicKey], liquidity: LiquidityConfig, relayFees: RelayFeesConfig) + + /** + * @param autoFund if true, we will automatically fund channels. + * @param autoClose if true, we will automatically close unused channels to reclaim liquidity. + * @param minFundingAmount we always fund channels with at least this amount. + * @param maxFundingAmount we never fund channels with more than this amount. + * @param maxFundingTxPerDay we rate-limit the number of transactions we make per day (on average). + * @param minOnChainBalance we stop funding channels if our on-chain balance is below this amount. + * @param maxFeerate we stop funding channels if the on-chain feerate is above this value. + */ + case class LiquidityConfig(autoFund: Boolean, autoClose: Boolean, minFundingAmount: Satoshi, maxFundingAmount: Satoshi, maxFundingTxPerDay: Int, minOnChainBalance: Satoshi, maxFeerate: FeeratePerKw) + + /** + * @param autoUpdate if true, we will automatically update our relay fees. + * @param minRelayFees we will not lower our relay fees below this value. + * @param maxRelayFees we will not raise our relay fees above this value. + * @param dailyPaymentVolumeThreshold we only increase fees if the daily outgoing payment volume exceeds this threshold or [[dailyPaymentVolumeThresholdPercent]]. + * @param dailyPaymentVolumeThresholdPercent we only increase fees if the daily outgoing payment volume exceeds this percentage of our peer capacity or [[dailyPaymentVolumeThreshold]]. + */ + case class RelayFeesConfig(autoUpdate: Boolean, minRelayFees: RelayFees, maxRelayFees: RelayFees, dailyPaymentVolumeThreshold: Satoshi, dailyPaymentVolumeThresholdPercent: Double) + + private case class LiquidityDecision(peer: PeerInfo, fundingAmount: Satoshi) { + val remoteNodeId: PublicKey = peer.remoteNodeId + } + + private def rollingDailyStats(p: PeerInfo): Seq[Seq[PeerStats]] = { + (0 until (p.stats.size - Bucket.bucketsPerDay)).map(i => p.stats.slice(i, i + Bucket.bucketsPerDay)) + } + + private def bestDailyVolumeOut(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.totalAmountOut).sum).max + + private def bestDailyProfit(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.profit).sum).max + + private def bestDailyOutgoingFlow(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.outgoingFlow).sum).max + + def apply(nodeParams: NodeParams, wallet: OnChainBalanceChecker, statsTracker: ActorRef[PeerStatsTracker.GetLatestStats], register: UntypedActorRef): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(ScorePeers(None), nodeParams.peerScoringConfig.scoringFrequency) + new PeerScorer(nodeParams, wallet, statsTracker, register, context).run() + } + } + } + +} + +private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, statsTracker: ActorRef[PeerStatsTracker.GetLatestStats], register: UntypedActorRef, context: ActorContext[PeerScorer.Command]) { + + import PeerScorer._ + + implicit val ec: ExecutionContext = context.system.executionContext + private val log = context.log + private val config = nodeParams.peerScoringConfig + + private def run(): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case ScorePeers(replyTo_opt) => + statsTracker ! PeerStatsTracker.GetLatestStats(context.messageAdapter[PeerStatsTracker.LatestStats](e => WrappedLatestStats(e.peers))) + waitForStats(replyTo_opt) + } + } + + private def waitForStats(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]]): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case WrappedLatestStats(peers) => scorePeers(replyTo_opt, peers) + } + } + + private def scorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], peers: Seq[PeerInfo]): Behavior[Command] = { + log.info("scoring {} peers", peers.size) + val dailyProfit = peers.map(_.stats.take(Bucket.bucketsPerDay).map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc + val weeklyProfit = peers.map(_.stats.map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc + log.info("rolling daily profit = {} and weekly profit = {}", dailyProfit, weeklyProfit) + + // We select peers that have the largest outgoing payment volume of the past day. + // This biases towards nodes that already have a large capacity and have liquidity available. + val bestPeersByVolume = peers + .map(p => (p, p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum)) + .filter(_._2 > 0.msat) + .sortBy(_._2)(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + .map(_._1) + log.debug("top {} peers:", bestPeersByVolume.size) + printDailyStats(bestPeersByVolume) + + // We select peers that have a low outgoing balance but were previously good peers, by looking at the largest daily + // outgoing payment volume of the past week, using a rolling daily window. This also biases towards nodes that + // already have a large capacity, but lets us identify good peers that ran out of liquidity. + val bestPeersWithoutLiquidityByVolume = peers + .filter(p => p.canSend <= p.capacity * 0.1) + .map(p => (p, bestDailyVolumeOut(p))) + .sortBy(_._2)(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + .map(_._1) + + // We identify peers that need additional liquidity. + val bestPeersThatNeedLiquidity = bestPeersByVolume + // We select peers that have a large outgoing flow, which means they will run out of liquidity in the next days. + .filter(p => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2) + // And peers that have already ran out of liquidity. + .appendedAll(bestPeersWithoutLiquidityByVolume) + .distinctBy(_.remoteNodeId) + // We order them by their best daily profit of the past week. + .map(p => (p, bestDailyProfit(p))) + .sortBy(_._2)(Ordering[MilliSatoshi].reverse) + .take(config.topPeersCount) + // We fund a multiple of our daily outgoing flow, to ensure we have enough liquidity for a few days. + .map { case (p, _) => LiquidityDecision(p, (bestDailyOutgoingFlow(p) * 4).truncateToSatoshi) } + .filter(_.fundingAmount > 0.sat) + if (bestPeersThatNeedLiquidity.nonEmpty) { + log.debug("top {} peers that need liquidity:", bestPeersThatNeedLiquidity.size) + printDailyStats(bestPeersThatNeedLiquidity.map(_.peer)) + } + + // We select peers that are performing well relative to their capacity, which lets us identify good peers that may + // have a smaller capacity and may deserve a capacity increase. + val goodSmallPeers = peers + .map(p => (p, bestDailyVolumeOut(p).truncateToSatoshi.toLong.toDouble / p.capacity.toLong)) + .filter(_._2 > 0) + .sortBy(_._2)(Ordering[Double].reverse) + .take(config.topPeersCount) + // We only keep peers that may run out of liquidity in the next days. + .filter { case (p, _) => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2 } + .map(_._1) + // We'd like to increase their capacity by 50%. + .map(p => LiquidityDecision(p, p.capacity * 0.5)) + if (goodSmallPeers.nonEmpty) { + log.debug("we've identified {} smaller peers that perform well relative to their capacity", goodSmallPeers.size) + printDailyStats(goodSmallPeers.map(_.peer)) + } + + // We try to identify peers that may have ran out of liquidity a long time ago but are interesting for us. + val peersToRevive = peers + // We prioritize peers that the node operator whitelisted. + .filter(p => config.topPeersWhitelist.contains(p.remoteNodeId)) + .sortBy(_.capacity)(Ordering[Satoshi].reverse) + // And we add peers with a large capacity that have a low balance. + .appendedAll(peers + .sortBy(_.capacity)(Ordering[Satoshi].reverse) + .take(config.topPeersCount) + .filter(p => p.canSend <= p.capacity * 0.1) + ) + .distinctBy(_.remoteNodeId) + // We'd like to increase their capacity by 25%. + .map(p => LiquidityDecision(p, p.capacity * 0.25)) + + // Since we're not yet reading past events from the DB, we need to wait until we have collected enough data before + // taking some actions such as opening or closing channels or updating relay fees. + // TODO: remove this once we start reading past data from the AuditDb on restart. + val hasPastData = bestPeersByVolume.exists(_.stats.drop(Bucket.bucketsPerDay).exists(_ != PeerStats.empty)) + if (hasPastData && replyTo_opt.isEmpty) { + closeChannelsIfNeeded(peers) + updateRelayFeesIfNeeded(peers) + fundChannelsIfNeeded(bestPeersThatNeedLiquidity, goodSmallPeers, peersToRevive) + } else { + replyTo_opt.foreach(_ ! bestPeersThatNeedLiquidity.map(_.peer)) + run() + } + } + + private def printDailyStats(peers: Seq[PeerInfo]): Unit = { + log.debug("| rank | node_id | daily_volume | daily_profit | can_send | can_receive |") + log.debug("|------|--------------------------------------------------------------------|---------------|---------------|---------------|---------------|") + peers.zipWithIndex.foreach { case (p, i) => + val dailyStats = p.stats.take(Bucket.bucketsPerDay) + val dailyVolume = dailyStats.map(_.totalAmountOut).sum.truncateToSatoshi.toMilliBtc + val dailyProfit = dailyStats.map(_.profit).sum.truncateToSatoshi.toMilliBtc + val canSend = p.canSend.truncateToSatoshi.toMilliBtc + val canReceive = p.canReceive.truncateToSatoshi.toMilliBtc + log.debug(f"| ${i + 1}%4d | ${p.remoteNodeId} | ${dailyVolume.toDouble}%8.2f mbtc | ${dailyProfit.toDouble}%8.2f mbtc | ${canSend.toDouble}%8.2f mbtc | ${canReceive.toDouble}%8.2f mbtc |") + } + log.debug("|------|--------------------------------------------------------------------|---------------|---------------|---------------|---------------|") + } + + private def closeChannelsIfNeeded(peers: Seq[PeerInfo]): Unit = { + peers + // We only close channels when we have more than one. + .filter(_.channels.size > 1) + // We only close channels for which most of the liquidity is idle on our side. + .filter(p => p.canSend >= p.capacity * 0.8 && p.stats.map(_.totalAmountOut).sum <= p.capacity * 0.05) + .foreach(p => { + val channels = p.channels.sortWith { + // We want to keep a public channel over a private channel. + case (c1, c2) if c1.isPublic != c2.isPublic => c1.isPublic + // Otherwise, we keep the channel with the largest capacity. + case (c1, c2) if c1.capacity != c2.capacity => c1.capacity >= c2.capacity + // Otherwise, we keep the channel with the smallest balance (and thus highest inbound liquidity). + case (c1, c2) => c1.canSend <= c2.canSend + } + // We keep the best channel and close the others. + channels.tail.foreach { c => + log.debug("we should close channel_id={} with remote_node_id={} (local={}, remote={})", c.channelId, p.remoteNodeId, c.canSend.truncateToSatoshi.toMilliBtc, c.canReceive.truncateToSatoshi.toMilliBtc) + if (config.liquidity.autoClose && nodeParams.currentFeeratesForFundingClosing.medium <= config.liquidity.maxFeerate) { + log.info("closing channel_id={} with remote_node_id={} (local={}, remote={})", c.channelId, p.remoteNodeId, c.canSend.truncateToSatoshi.toMilliBtc, c.canReceive.truncateToSatoshi.toMilliBtc) + val cmd = CMD_CLOSE(UntypedActorRef.noSender, None, None) + register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd) + } + } + }) + } + + private def updateRelayFeesIfNeeded(peers: Seq[PeerInfo]): Unit = { + // We configure *daily* absolute and proportional payment volume targets. We look at events from the current period + // and the previous period, so we need to get the right ratio to convert those daily amounts. + val now = TimestampMilli.now() + val lastTwoBucketsRatio = 1.0 + Bucket.consumed(now) + val lastTwoBucketsDailyRatio = (Bucket.duration * lastTwoBucketsRatio).toSeconds.toDouble / (24 * 3600) + // We increase fees of channels that are performing better than we expected. + log.debug("we should update our relay fees with the following peers:") + log.debug("| node_id | volume_variation | decision | current_fee | next_fee |") + log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + peers + // We select peers that have exceeded our payment volume target in the past two periods. + .filter(p => p.stats.take(2).map(_.totalAmountOut).sum >= Seq(config.relayFees.dailyPaymentVolumeThreshold * lastTwoBucketsDailyRatio, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent * lastTwoBucketsDailyRatio).min) + // And that have an increasing payment volume compared to the period before that. + .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat) + .filter(p => (p.stats.take(2).map(_.totalAmountOut).sum / lastTwoBucketsRatio) > p.stats.slice(2, 3).map(_.totalAmountOut).sum * 1.1) + .foreach(p => { + p.latestUpdate_opt match { + // And for which we haven't updated our relay fees recently already. + case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds => + val next = u.relayFees.feeProportionalMillionths + 500 + val volumeVariation = p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio) + log.debug(f"| ${p.remoteNodeId} | $volumeVariation%.2f | increase | ${u.feeProportionalMillionths}%11d | $next%8d |") + if (config.relayFees.autoUpdate && next <= config.relayFees.maxRelayFees.feeProportionalMillionths) { + val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, u.relayFees.feeBase, next) + p.channels.foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)) + } + case _ => () + } + }) + // We decrease fees of channels that aren't performing well. + peers + // We select peers that have a recent 15% decrease in outgoing payment volume. + .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat) + .filter(p => p.stats.take(2).map(_.totalAmountOut).sum <= p.stats.slice(2, 3).map(_.totalAmountOut).sum * 0.85) + // For which the volume was previously already stable or decreasing. + .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum <= p.stats.slice(3, 4).map(_.totalAmountOut).sum) + // And that have enough liquidity to relay outgoing payments. + .filter(p => p.canSend >= Seq(config.relayFees.dailyPaymentVolumeThreshold, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent).min) + .foreach(p => { + p.latestUpdate_opt match { + // And for which we haven't updated our relay fees recently already. + case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds => + val next = u.relayFees.feeProportionalMillionths - 500 + val volumeVariation = p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio) + log.debug(f"| ${p.remoteNodeId} | $volumeVariation%.2f | decrease | ${u.feeProportionalMillionths}%11d | $next%8d |") + if (config.relayFees.autoUpdate && next >= config.relayFees.minRelayFees.feeProportionalMillionths) { + val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, u.relayFees.feeBase, next) + p.channels.foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)) + } + case _ => () + } + }) + log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + } + + private def fundChannelsIfNeeded(bestPeers: Seq[LiquidityDecision], smallPeers: Seq[LiquidityDecision], toRevive: Seq[LiquidityDecision]): Behavior[Command] = { + // We don't want to fund peers every time our scoring algorithm runs, otherwise we may create too many on-chain + // transactions. We draw from a random distribution to ensure that on average: + // - we follow our rate-limit for funding best peers and fund at most 3 at a time + // - we fund a small peer once every 3 days, chosen at random between our top 3 + // - we revive an older peer once every day, chosen at random between our top 3 + val toFund = { + val scoringPerDay = (1 day).toSeconds / config.scoringFrequency.toSeconds + val bestPeersToFund = bestPeers.headOption match { + case Some(_) if Random.nextDouble() <= config.liquidity.maxFundingTxPerDay.toDouble / (scoringPerDay * bestPeers.size.min(3)) => bestPeers.take(3) + case _ => Nil + } + val smallPeerToFund_opt = smallPeers.headOption match { + case Some(_) if Random.nextDouble() <= (1.0 / (scoringPerDay * 3)) => Random.shuffle(smallPeers.take(3)).headOption + case _ => None + } + val toReviveNotAlreadySelected = toRevive.filterNot(p => bestPeers.exists(_.remoteNodeId == p.remoteNodeId) || smallPeerToFund_opt.exists(_.remoteNodeId == p.remoteNodeId)) + val toRevive_opt = toReviveNotAlreadySelected.headOption match { + case Some(_) if Random.nextDouble() <= (1.0 / scoringPerDay) => Random.shuffle(toReviveNotAlreadySelected.take(3)).headOption + case _ => None + } + (bestPeersToFund ++ toRevive_opt ++ smallPeerToFund_opt).distinctBy(_.remoteNodeId) + } + if (bestPeers.isEmpty && smallPeers.isEmpty && toRevive.isEmpty) { + log.info("we haven't identified peers that require liquidity yet") + run() + } else if (toFund.isEmpty) { + log.info("we skip funding peers because of per-day rate-limits: increase eclair.peer-scoring.liquidity.max-funding-tx-per-day to fund more often") + run() + } else if (config.liquidity.maxFeerate < nodeParams.currentFeeratesForFundingClosing.medium) { + log.info("we skip funding peers because current feerate is too high ({} < {}): increase eclair.peer-scoring.liquidity.max-feerate-sat-per-byte to start funding again", config.liquidity.maxFeerate, nodeParams.currentFeeratesForFundingClosing.medium) + run() + } else { + context.pipeToSelf(wallet.onChainBalance()) { + case Success(b) => OnChainBalance(b.confirmed, b.unconfirmed) + case Failure(e) => WalletError(e) + } + Behaviors.receiveMessagePartial { + case WalletError(e) => + log.warn("cannot get on-chain balance: {}", e.getMessage) + run() + case OnChainBalance(confirmed, unconfirmed) => + if (confirmed <= config.liquidity.minOnChainBalance) { + log.info("we don't have enough on-chain balance to fund new channels (confirmed={}, unconfirmed={})", confirmed.toMilliBtc, unconfirmed.toMilliBtc) + } else { + toFund + // We don't fund peers that are already being funded. + .filterNot(_.peer.hasPendingChannel) + // And we apply our configured funding limits to the liquidity suggestions. + .map(f => f.copy(fundingAmount = f.fundingAmount.max(config.liquidity.minFundingAmount).min(config.liquidity.maxFundingAmount))) + .foldLeft(confirmed - config.liquidity.minOnChainBalance) { + case (available, f) if available < f.fundingAmount * 0.5 => available + case (available, f) => + val fundingAmount = f.fundingAmount.min(available) + log.info("funding channel with remote_node_id={} (funding_amount={})", f.remoteNodeId, fundingAmount.toMilliBtc) + // TODO: when do we want to create a private channel? Maybe if we already have a bigger public channel? + val channelFlags = ChannelFlags(announceChannel = true) + val cmd = OpenChannel(f.remoteNodeId, fundingAmount, None, None, None, None, None, Some(channelFlags), Some(Timeout(60 seconds))) + register ! Register.ForwardNodeId(context.system.ignoreRef, f.remoteNodeId, cmd) + available - fundingAmount + } + } + run() + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala new file mode 100644 index 0000000000..e44a3c0959 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala @@ -0,0 +1,380 @@ +/* + * Copyright 2026 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.profit + +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior} +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong} +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.db.AuditDb +import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent} +import fr.acinq.eclair.wire.protocol.ChannelUpdate +import fr.acinq.eclair.{Features, MilliSatoshi, MilliSatoshiLong, TimestampMilli, ToMilliSatoshiConversion} + +import java.time.{Instant, ZoneId, ZonedDateTime} +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +/** + * Created by t-bast on 30/01/2026. + */ + +object PeerStatsTracker { + // @formatter:off + sealed trait Command + case class GetLatestStats(replyTo: ActorRef[LatestStats]) extends Command + private[profit] case object RemoveOldBuckets extends Command + private[profit] case class WrappedPaymentSent(e: PaymentSent) extends Command + private[profit] case class WrappedPaymentRelayed(e: PaymentRelayed) extends Command + private[profit] case class WrappedPaymentReceived(e: PaymentReceived) extends Command + private[profit] case class ChannelCreationInProgress(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command + private[profit] case class ChannelCreationAborted(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command + private[profit] case class WrappedLocalChannelUpdate(e: LocalChannelUpdate) extends Command + private[profit] case class WrappedLocalChannelDown(e: LocalChannelDown) extends Command + private[profit] case class WrappedAvailableBalanceChanged(e: AvailableBalanceChanged) extends Command + // @formatter:on + + def apply(db: AuditDb, channels: Seq[PersistentChannelData]): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.withTimers { timers => + new PeerStatsTracker(db, timers, context).start(channels) + } + } + } + + /** Returns the latest statistics for all of our public peers. */ + case class LatestStats(peers: Seq[PeerInfo]) + + /** NB: stats are ordered, with the most recent events first. */ + case class PeerInfo(remoteNodeId: PublicKey, stats: Seq[PeerStats], channels: Seq[ChannelInfo], latestUpdate_opt: Option[ChannelUpdate], hasPendingChannel: Boolean) { + val capacity: Satoshi = channels.map(_.capacity).sum + val canSend: MilliSatoshi = channels.map(_.canSend).sum + val canReceive: MilliSatoshi = channels.map(_.canReceive).sum + } + + /** We compute peer statistics per buckets spanning a few hours. */ + case class Bucket(private val day: Int, private val month: Int, private val year: Int, private val slot: Int) extends Ordered[Bucket] { + override def compare(that: Bucket): Int = that match { + case Bucket(_, _, y, _) if y != year => year - y + case Bucket(_, m, _, _) if m != month => month - m + case Bucket(d, _, _, _) if d != day => day - d + case Bucket(_, _, _, s) => slot - s + } + + override def toString: String = f"$year-$month%02d-$day%02d-$slot" + } + + object Bucket { + val duration: FiniteDuration = 3 hours + val bucketsPerDay: Int = 8 + + def from(ts: TimestampMilli): Bucket = { + val date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts.toLong), ZoneId.of("UTC")) + Bucket(date.getDayOfMonth, date.getMonthValue, date.getYear, date.getHour * bucketsPerDay / 24) + } + + /** Returns the percentage (between 0.00 and 1.00) of the bucket consumed at the given timestamp. */ + def consumed(ts: TimestampMilli): Double = { + val bucket = from(ts) + val start = ZonedDateTime.of(bucket.year, bucket.month, bucket.day, bucket.slot * 24 / bucketsPerDay, 0, 0, 0, ZoneId.of("UTC")).toEpochSecond + (ts.toTimestampSecond.toLong - start).toDouble / duration.toSeconds + } + } + + case class PeerStats(totalAmountIn: MilliSatoshi, totalAmountOut: MilliSatoshi, relayFeeEarned: MilliSatoshi, onChainFeePaid: Satoshi, liquidityFeeEarned: Satoshi, liquidityFeePaid: Satoshi) { + val outgoingFlow: MilliSatoshi = totalAmountOut - totalAmountIn + val profit: MilliSatoshi = relayFeeEarned + liquidityFeeEarned.toMilliSatoshi - onChainFeePaid.toMilliSatoshi - liquidityFeePaid.toMilliSatoshi + } + + object PeerStats { + def empty: PeerStats = PeerStats(0 msat, 0 msat, 0 msat, 0 sat, 0 sat, 0 sat) + } + + /** + * We aggregate events into buckets to avoid storing too much data per peer, while providing enough granularity to + * detect variations in volume and flows. Note that we store an entry for every peer with whom we have a channel, + * even for peers that don't have any activity (which lets us detect those peers and potentially reclaim liquidity). + */ + case class BucketedPeerStats(private val stats: Map[PublicKey, Map[Bucket, PeerStats]]) { + def peers: Iterable[PublicKey] = stats.keys + + /** Returns stats for the given peer in all buckets (most recent bucket first). */ + def getPeerStats(remoteNodeId: PublicKey, now: TimestampMilli): Seq[PeerStats] = { + stats.get(remoteNodeId) match { + case Some(_) => (0 until BucketedPeerStats.bucketsCount).map(b => getPeerStatsForBucket(remoteNodeId, Bucket.from(now - b * Bucket.duration))) + case None => Seq.fill(BucketedPeerStats.bucketsCount)(PeerStats.empty) + } + } + + /** Returns stats for the given bucket. */ + private def getPeerStatsForBucket(remoteNodeId: PublicKey, bucket: Bucket): PeerStats = { + stats.get(remoteNodeId).flatMap(_.get(bucket)).getOrElse(PeerStats.empty) + } + + /** + * When our first channel with a peer is created, we want to add an entry for it, even though there are no payments yet. + * This lets us detect new peers that are idle, which can be useful in many scenarios. + */ + def initializePeerIfNeeded(remoteNodeId: PublicKey): BucketedPeerStats = { + stats.get(remoteNodeId) match { + case Some(_) => this // already initialized + case None => this.copy(stats = stats + (remoteNodeId -> Map.empty)) + } + } + + private def addOrUpdate(remoteNodeId: PublicKey, bucket: Bucket, peerStats: PeerStats): BucketedPeerStats = { + val buckets = stats.getOrElse(remoteNodeId, Map.empty[Bucket, PeerStats]) + copy(stats = stats + (remoteNodeId -> (buckets + (bucket -> peerStats)))) + } + + def addPaymentSent(e: PaymentSent): BucketedPeerStats = { + e.parts.foldLeft(this) { + case (current, p) => + val bucket = Bucket.from(p.settledAt) + val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + p.amountWithFees) + current.addOrUpdate(p.remoteNodeId, bucket, peerStats1) + } + } + + def addPaymentReceived(e: PaymentReceived): BucketedPeerStats = { + e.parts.foldLeft(this) { + case (current, p) => + val bucket = Bucket.from(p.receivedAt) + val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + p.amount) + current.addOrUpdate(p.remoteNodeId, bucket, peerStats1) + } + } + + def addPaymentRelayed(e: PaymentRelayed): BucketedPeerStats = { + val withIncoming = e.incoming.foldLeft(this) { + case (current, i) => + val bucket = Bucket.from(i.receivedAt) + val peerStats = current.getPeerStatsForBucket(i.remoteNodeId, bucket) + val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + i.amount) + current.addOrUpdate(i.remoteNodeId, bucket, peerStats1) + } + e.outgoing.foldLeft(withIncoming) { + case (current, o) => + val bucket = Bucket.from(o.settledAt) + val peerStats = current.getPeerStatsForBucket(o.remoteNodeId, bucket) + // When using MPP and trampoline, payments can be relayed through multiple nodes at once. + // We split the fee according to the proportional amount relayed through the requested node. + val relayFee = e.relayFee * (o.amount.toLong.toDouble / e.amountOut.toLong) + val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + o.amount, relayFeeEarned = peerStats.relayFeeEarned + relayFee) + current.addOrUpdate(o.remoteNodeId, bucket, peerStats1) + } + } + + /** Remove old buckets that exceed our retention window. This should be called frequently to avoid memory leaks. */ + def removeOldBuckets(now: TimestampMilli): BucketedPeerStats = { + val oldestBucket = Bucket.from(now - Bucket.duration * BucketedPeerStats.bucketsCount) + copy(stats = stats.map { + case (remoteNodeId, peerStats) => remoteNodeId -> peerStats.filter { case (bucket, _) => bucket >= oldestBucket } + }) + } + + /** Remove a peer from our list: this should only happen when we don't have channels with that peer anymore. */ + def removePeer(remoteNodeId: PublicKey): BucketedPeerStats = copy(stats = stats - remoteNodeId) + } + + object BucketedPeerStats { + // We keep 7 days of past history. + val bucketsCount: Int = 7 * Bucket.bucketsPerDay + + def empty(peers: Set[PublicKey]): BucketedPeerStats = BucketedPeerStats(peers.map(remoteNodeId => remoteNodeId -> Map.empty[Bucket, PeerStats]).toMap) + } + + /** We keep minimal information about open channels to allow running our heuristics. */ + case class ChannelInfo(channelId: ByteVector32, capacity: Satoshi, canSend: MilliSatoshi, canReceive: MilliSatoshi, isPublic: Boolean) + + object ChannelInfo { + def apply(commitments: Commitments): ChannelInfo = ChannelInfo(commitments.channelId, commitments.latest.capacity, commitments.availableBalanceForSend, commitments.availableBalanceForReceive, commitments.announceChannel) + } + + /** + * Note that we keep channel updates separately: we're only interested in the relay fees, which are the same for every channel. + * We also keep track of pending channels (channels being created), to avoid creating many channels at the same time. + */ + case class PeerChannels(private val channels: Map[PublicKey, Seq[ChannelInfo]], private val updates: Map[PublicKey, ChannelUpdate], private val pending: Map[PublicKey, Set[ByteVector32]]) { + def peers: Set[PublicKey] = channels.keySet + + def getChannels(remoteNodeId: PublicKey): Seq[ChannelInfo] = channels.getOrElse(remoteNodeId, Nil) + + def getUpdate(remoteNodeId: PublicKey): Option[ChannelUpdate] = updates.get(remoteNodeId) + + def hasChannels(remoteNodeId: PublicKey): Boolean = channels.contains(remoteNodeId) + + def hasPendingChannel(remoteNodeId: PublicKey): Boolean = pending.contains(remoteNodeId) + + def updateChannel(e: LocalChannelUpdate): PeerChannels = { + // Note that creating our channel update implicitly means that the channel isn't pending anymore. + updates.get(e.remoteNodeId) match { + case Some(u) if u.timestamp > e.channelUpdate.timestamp => updateChannel(e.commitments).removePendingChannel(e.remoteNodeId, e.channelId) + case _ => updateChannel(e.commitments).copy(updates = updates + (e.remoteNodeId -> e.channelUpdate)).removePendingChannel(e.remoteNodeId, e.channelId) + } + } + + def updateChannel(e: AvailableBalanceChanged): PeerChannels = { + updateChannel(e.commitments) + } + + private def updateChannel(commitments: Commitments): PeerChannels = { + if (!commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve)) { + val peerChannels1 = channels.getOrElse(commitments.remoteNodeId, Nil).filter(_.channelId != commitments.channelId) :+ ChannelInfo(commitments) + copy(channels = channels + (commitments.remoteNodeId -> peerChannels1)) + } else { + // We filter out channels with mobile wallets. + copy(channels = channels - commitments.remoteNodeId) + } + } + + def addPendingChannel(e: ChannelCreationInProgress): PeerChannels = { + val pending1 = pending + (e.remoteNodeId -> (pending.getOrElse(e.remoteNodeId, Set.empty) + e.channelId)) + copy(pending = pending1) + } + + def removeChannel(e: LocalChannelDown): PeerChannels = { + val updated = channels.get(e.remoteNodeId) match { + case Some(peerChannels) => + val peerChannels1 = peerChannels.filter(_.channelId != e.channelId) + if (peerChannels1.isEmpty) { + copy(channels = channels - e.remoteNodeId, updates = updates - e.remoteNodeId) + } else { + copy(channels = channels + (e.remoteNodeId -> peerChannels1)) + } + case None => this + } + updated.removePendingChannel(e.remoteNodeId, e.channelId) + } + + private def removePendingChannel(remoteNodeId: PublicKey, channelId: ByteVector32): PeerChannels = { + val pending1 = pending.get(remoteNodeId) match { + case Some(channels) => + val channels1 = channels - channelId + if (channels1.isEmpty) { + pending - remoteNodeId + } else { + pending + (remoteNodeId -> channels1) + } + case None => pending + } + copy(pending = pending1) + } + + def removeChannel(e: ChannelCreationAborted): PeerChannels = { + removePendingChannel(e.remoteNodeId, e.channelId) + } + } + + private object PeerChannels { + def apply(channels: Seq[PersistentChannelData]): PeerChannels = { + channels.foldLeft(PeerChannels(Map.empty[PublicKey, Seq[ChannelInfo]], Map.empty[PublicKey, ChannelUpdate], Map.empty[PublicKey, Set[ByteVector32]])) { + case (current, channel) => channel match { + // We include pending channels. + case _: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED | _: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_DUAL_FUNDING_READY | _: DATA_WAIT_FOR_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_CHANNEL_READY if !channel.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) => + val pending1 = current.pending.getOrElse(channel.remoteNodeId, Set.empty) + channel.channelId + current.copy(pending = current.pending + (channel.remoteNodeId -> pending1)) + // We filter out channels with mobile wallets. + case d: DATA_NORMAL if !d.commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) => + val peerChannels1 = current.channels.getOrElse(d.remoteNodeId, Nil) :+ ChannelInfo(d.commitments) + val update1 = current.updates.get(d.remoteNodeId) match { + case Some(update) if update.timestamp > d.channelUpdate.timestamp => update + case _ => d.channelUpdate + } + current.copy( + channels = current.channels + (d.remoteNodeId -> peerChannels1), + updates = current.updates + (d.remoteNodeId -> update1) + ) + case _ => current + } + } + } + } + +} + +private class PeerStatsTracker(db: AuditDb, timers: TimerScheduler[PeerStatsTracker.Command], context: ActorContext[PeerStatsTracker.Command]) { + + import PeerStatsTracker._ + + private val log = context.log + + private def start(channels: Seq[PersistentChannelData]): Behavior[Command] = { + // We subscribe to channel events to update channel balances. + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelIdAssigned](e => ChannelCreationInProgress(e.remoteNodeId, e.channelId))) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelAborted](e => ChannelCreationAborted(e.remoteNodeId, e.channelId))) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelDown)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelUpdate)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedAvailableBalanceChanged)) + val peerChannels = PeerChannels(channels) + log.info("gathering statistics for {} peers", peerChannels.peers.size) + // We subscribe to payment events to update statistics. + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentSent)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentRelayed)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentReceived)) + // TODO: read events that happened before startedAt from the DB to initialize statistics from past data. + val stats = BucketedPeerStats.empty(peerChannels.peers) + timers.startTimerWithFixedDelay(RemoveOldBuckets, Bucket.duration) + listening(stats, peerChannels) + } + + private def listening(stats: BucketedPeerStats, channels: PeerChannels): Behavior[Command] = { + Behaviors.receiveMessage { + case WrappedPaymentSent(e) => + listening(stats.addPaymentSent(e), channels) + case WrappedPaymentReceived(e) => + listening(stats.addPaymentReceived(e), channels) + case WrappedPaymentRelayed(e) => + listening(stats.addPaymentRelayed(e), channels) + case e: ChannelCreationInProgress => + listening(stats, channels.addPendingChannel(e)) + case e: ChannelCreationAborted => + listening(stats, channels.removeChannel(e)) + case WrappedLocalChannelUpdate(e) => + listening(stats.initializePeerIfNeeded(e.remoteNodeId), channels.updateChannel(e)) + case WrappedAvailableBalanceChanged(e) => + listening(stats, channels.updateChannel(e)) + case WrappedLocalChannelDown(e) => + val channels1 = channels.removeChannel(e) + val stats1 = if (channels1.getChannels(e.remoteNodeId).isEmpty && !channels1.hasPendingChannel(e.remoteNodeId)) { + stats.removePeer(e.remoteNodeId) + } else { + stats + } + listening(stats1, channels1) + case RemoveOldBuckets => + listening(stats.removeOldBuckets(TimestampMilli.now()), channels) + case GetLatestStats(replyTo) => + // TODO: do a db.listConfirmed() to update on-chain stats (we cannot rely on events only because data comes from + // the TransactionPublished event, but should only be applied after TransactionConfirmed so we need permanent + // storage). We'll need the listConfirmed() function added in https://github.com/ACINQ/eclair/pull/3245. + log.debug("statistics available for {} peers", stats.peers.size) + val now = TimestampMilli.now() + val latest = stats.peers + // We only return statistics for peers with whom we have channels available for payments. + .filter(nodeId => channels.hasChannels(nodeId)) + .map(nodeId => PeerInfo(nodeId, stats.getPeerStats(nodeId, now), channels.getChannels(nodeId), channels.getUpdate(nodeId), channels.hasPendingChannel(nodeId))) + .toSeq + replyTo ! LatestStats(latest) + listening(stats, channels) + } + } + +} diff --git a/eclair-core/src/test/resources/logback-test.xml b/eclair-core/src/test/resources/logback-test.xml index 236be1d4af..0b946139b1 100644 --- a/eclair-core/src/test/resources/logback-test.xml +++ b/eclair-core/src/test/resources/logback-test.xml @@ -55,6 +55,7 @@ + diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 3a43f342cb..b599f6d2f4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -29,6 +29,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.offer.OffersConfig import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.profit.PeerScorer import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Graph.{HeuristicsConstants, MessageWeightRatios} import fr.acinq.eclair.router.Router._ @@ -261,6 +262,28 @@ object TestConstants { peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour), + peerScoringConfig = PeerScorer.Config( + enabled = true, + scoringFrequency = 1 day, + topPeersCount = 10, + topPeersWhitelist = Set.empty, + liquidity = PeerScorer.LiquidityConfig( + autoFund = true, + autoClose = true, + minFundingAmount = 1_000_000 sat, // 0.01 BTC + maxFundingAmount = 100_000_000 sat, // 1 BTC + minOnChainBalance = 5_000_000 sat, // 0.05 BTC + maxFeerate = FeeratePerByte(100 sat).perKw, + maxFundingTxPerDay = 100, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = true, + minRelayFees = RelayFees(1 msat, 500), + maxRelayFees = RelayFees(10_000 msat, 5000), + dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC + dailyPaymentVolumeThresholdPercent = 0.1, + ) + ), offersConfig = OffersConfig(messagePathMinLength = 2, paymentPathCount = 2, paymentPathLength = 4, paymentPathCltvExpiryDelta = CltvExpiryDelta(500)), ) @@ -453,6 +476,28 @@ object TestConstants { peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour), + peerScoringConfig = PeerScorer.Config( + enabled = true, + scoringFrequency = 1 day, + topPeersCount = 10, + topPeersWhitelist = Set.empty, + liquidity = PeerScorer.LiquidityConfig( + autoFund = true, + autoClose = true, + minFundingAmount = 1_000_000 sat, // 0.01 BTC + maxFundingAmount = 100_000_000 sat, // 1 BTC + minOnChainBalance = 5_000_000 sat, // 0.05 BTC + maxFeerate = FeeratePerByte(100 sat).perKw, + maxFundingTxPerDay = 100, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = true, + minRelayFees = RelayFees(1 msat, 500), + maxRelayFees = RelayFees(10_000 msat, 5000), + dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC + dailyPaymentVolumeThresholdPercent = 0.1, + ) + ), offersConfig = OffersConfig(messagePathMinLength = 2, paymentPathCount = 2, paymentPathLength = 4, paymentPathCltvExpiryDelta = CltvExpiryDelta(500)), ) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala index 1ee99dbb26..0df41cf417 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala @@ -250,4 +250,13 @@ object DummyOnChainWallet { MakeFundingTxResponse(fundingTx, 0, 420 sat) } +} + +class DummyBalanceChecker(var confirmedBalance: Satoshi = 0 sat, var unconfirmedBalance: Satoshi = 0 sat) extends OnChainBalanceChecker { + def setBalance(confirmed: Satoshi, unconfirmed: Satoshi): Unit = { + confirmedBalance = confirmed + unconfirmedBalance = unconfirmed + } + + override def onChainBalance()(implicit ec: ExecutionContext): Future[OnChainBalance] = Future.successful(OnChainBalance(confirmedBalance, unconfirmedBalance)) } \ No newline at end of file diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala new file mode 100644 index 0000000000..8230733aa5 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala @@ -0,0 +1,513 @@ +package fr.acinq.eclair.profit + +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, SatoshiLong} +import fr.acinq.eclair.blockchain.DummyBalanceChecker +import fr.acinq.eclair.blockchain.fee.FeeratePerByte +import fr.acinq.eclair.channel.{CMD_CLOSE, CMD_UPDATE_RELAY_FEE, Register} +import fr.acinq.eclair.io.Peer.OpenChannel +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.profit.PeerScorer._ +import fr.acinq.eclair.profit.PeerStatsTracker._ +import fr.acinq.eclair.wire.protocol.ChannelUpdate +import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags} +import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, TestConstants, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32} +import org.scalatest.Inside.inside +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.HexStringSyntax + +import scala.concurrent.duration.DurationInt +import scala.util.Random + +class PeerScorerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e") + private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28") + private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96") + + private val weeklyBuckets = Bucket.bucketsPerDay * 7 + private val defaultConfig = Config( + enabled = true, + // We set this to 1 day to more easily match daily rate-limits. + scoringFrequency = 1 day, + topPeersCount = 10, + topPeersWhitelist = Set.empty, + liquidity = PeerScorer.LiquidityConfig( + autoFund = true, + autoClose = true, + minFundingAmount = 1_000_000 sat, // 0.01 BTC + maxFundingAmount = 100_000_000 sat, // 1 BTC + minOnChainBalance = 5_000_000 sat, // 0.05 BTC + maxFeerate = FeeratePerByte(100 sat).perKw, + maxFundingTxPerDay = 100, + ), + relayFees = PeerScorer.RelayFeesConfig( + autoUpdate = true, + minRelayFees = RelayFees(1 msat, 500), + maxRelayFees = RelayFees(10_000 msat, 5000), + dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC + dailyPaymentVolumeThresholdPercent = 0.1, + ) + ) + + private case class Fixture(tracker: TestProbe[GetLatestStats], register: TestProbe[Any], wallet: DummyBalanceChecker, scorer: ActorRef[PeerScorer.Command]) + + private def withFixture(config: Config = defaultConfig, onChainBalance: BtcAmount = 0 sat)(testFun: Fixture => Any): Unit = { + val tracker = TestProbe[GetLatestStats]() + val register = TestProbe[Any]() + val wallet = new DummyBalanceChecker(confirmedBalance = onChainBalance.toMilliSatoshi.truncateToSatoshi) + val nodeParams = TestConstants.Alice.nodeParams.copy(peerScoringConfig = config) + val scorer = testKit.spawn(PeerScorer(nodeParams, wallet, tracker.ref, register.ref.toClassic)) + testFun(Fixture(tracker, register, wallet, scorer)) + } + + private def channelUpdate(capacity: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = { + val messageFlags = MessageFlags(dontForward = !announceChannel) + val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true) + ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi) + } + + private def peerStats(totalAmountIn: BtcAmount = 0 sat, + totalAmountOut: BtcAmount = 0 sat, + relayFeeEarned: BtcAmount = 0 sat, + onChainFeePaid: BtcAmount = 0 sat, + liquidityFeeEarned: BtcAmount = 0 sat, + liquidityFeePaid: BtcAmount = 0 sat): PeerStats = PeerStats( + totalAmountIn = totalAmountIn.toMilliSatoshi, + totalAmountOut = totalAmountOut.toMilliSatoshi, + relayFeeEarned = relayFeeEarned.toMilliSatoshi, + onChainFeePaid = onChainFeePaid.toMilliSatoshi.truncateToSatoshi, + liquidityFeeEarned = liquidityFeeEarned.toMilliSatoshi.truncateToSatoshi, + liquidityFeePaid = liquidityFeePaid.toMilliSatoshi.truncateToSatoshi + ) + + private def channelInfo(canSend: BtcAmount, canReceive: BtcAmount, channelId: ByteVector32 = randomBytes32(), isPublic: Boolean = true): ChannelInfo = ChannelInfo( + channelId = channelId, + capacity = canSend.toMilliSatoshi.truncateToSatoshi + canReceive.toMilliSatoshi.truncateToSatoshi, + canSend = canSend.toMilliSatoshi, + canReceive = canReceive.toMilliSatoshi, + isPublic = isPublic + ) + + test("simulate large outgoing flows") { + withFixture(onChainBalance = 10 btc) { f => + import f._ + + // We have channels with 3 peers: + // - we have a very large capacity with our first peer + // - we have a smaller capacity with our second peer + // - we have medium capacity with our third peer + val c1a = channelInfo(canSend = 0.5 btc, canReceive = 2.7 btc) + val c1b = channelInfo(canSend = 0.3 btc, canReceive = 3.5 btc, isPublic = false) + val c2a = channelInfo(canSend = 0.03 btc, canReceive = 0.25 btc) + val c2b = channelInfo(canSend = 0.04 btc, canReceive = 0.17 btc) + val c3 = channelInfo(canSend = 0.3 btc, canReceive = 0.7 btc) + + // We need to scale the last bucket of test data based on the current timestamp, otherwise we will overestimate + // the outgoing payment volume of the current bucket and thus always increase relay fees. + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + + // We have a stable, large outgoing flow with our first peer: we should add liquidity and keep our relay fees unchanged. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.51.btc * bucketRatio, relayFeeEarned = 0.01.btc * bucketRatio), + peerStats(totalAmountOut = 0.52 btc, relayFeeEarned = 0.01 btc), + peerStats(totalAmountOut = 0.48 btc, relayFeeEarned = 0.01 btc), + peerStats(totalAmountOut = 0.47 btc, relayFeeEarned = 0.01 btc), + peerStats(totalAmountOut = 0.51 btc, relayFeeEarned = 0.01 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.5.btc + Random.nextInt(500_000).sat, relayFeeEarned = 0.01 btc)), + channels = Seq(c1a, c1b), + latestUpdate_opt = Some(channelUpdate(c1a.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)), + hasPendingChannel = false, + ) + + // We have an increasing outgoing flow with our second peer: we should add liquidity and increase our routing fees. + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.0001.btc * bucketRatio), + peerStats(totalAmountOut = 0.015 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.011 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.0010 btc, relayFeeEarned = 0.0001 btc)), + channels = Seq(c2a, c2b), + latestUpdate_opt = Some(channelUpdate(c2a.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)), + hasPendingChannel = false + ) + + // We have a decreasing outgoing flow with our third peer: we should decrease our routing fees and may add liquidity. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00007.btc * bucketRatio), + peerStats(totalAmountOut = 0.03 btc, relayFeeEarned = 0.00009 btc), + peerStats(totalAmountOut = 0.07 btc, relayFeeEarned = 0.00012 btc), + peerStats(totalAmountOut = 0.07 btc, relayFeeEarned = 0.00011 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(c3), + latestUpdate_opt = Some(channelUpdate(c3.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + + // We increase our relay fees with our second peer, on all channels. + val feeIncreases = Seq( + register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]], + register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]], + ) + assert(feeIncreases.map(_.channelId).toSet == Set(c2a.channelId, c2b.channelId)) + assert(feeIncreases.map(_.message.feeProportionalMillionths).toSet == Set(1500)) + // We decrease our relay fees with our third peer. + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.channelId == c3.channelId) + assert(cmd.message.feeProportionalMillionths == 500) + } + // We fund channels with all of our peers. + val funding = Seq( + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + ) + assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2, remoteNodeId3)) + funding.filter(_.nodeId != remoteNodeId1).foreach(f => assert(f.message.fundingAmount < defaultConfig.liquidity.maxFundingAmount)) + funding.filter(_.nodeId == remoteNodeId1).foreach(f => assert(f.message.fundingAmount == defaultConfig.liquidity.maxFundingAmount)) + register.expectNoMessage(100 millis) + } + } + + test("simulate exhausted large outgoing flow") { + withFixture(onChainBalance = 10 btc) { f => + import f._ + + // Our first peer was very profitable, but it has exhausted its liquidity more than a week ago and doesn't have recent routing activity. + // But since it has a large capacity, we still fund a channel to try to revive it. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(Bucket.bucketsPerDay * 7)(PeerStats.empty), + channels = Seq(channelInfo(canSend = 10_000 sat, canReceive = 3 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our second peer was very profitable as well, but has exhausted its liquidity more recently. + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(Bucket.bucketsPerDay * 3)(PeerStats.empty) ++ Seq.fill(Bucket.bucketsPerDay * 4)(peerStats(totalAmountOut = 0.3 btc, relayFeeEarned = 0.0005 btc)), + channels = Seq(channelInfo(canSend = 10_000 sat, canReceive = 3 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our third peer has balanced flows. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(Bucket.bucketsPerDay * 7)(peerStats(totalAmountOut = 0.2 btc, totalAmountIn = 0.2 btc, relayFeeEarned = 0.001 btc)), + channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 2.5 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + val funding = Seq( + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + ) + assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + // We fund a smaller channel with the first node, because we don't know how it will perform. + funding.filter(_.nodeId == remoteNodeId1).foreach(f => assert(f.message.fundingAmount < defaultConfig.liquidity.maxFundingAmount)) + funding.filter(_.nodeId == remoteNodeId2).foreach(f => assert(f.message.fundingAmount == defaultConfig.liquidity.maxFundingAmount)) + register.expectNoMessage(100 millis) + } + } + + test("simulate good small peers that need more capacity") { + // We use a configuration that selects only our best peer: we should still select a smaller peer that performs well. + // We set a large scoring frequency to remove the random rate-limit (of 1 small peer funding every 3 days). + val config = defaultConfig.copy(topPeersCount = 1, scoringFrequency = 7 days) + withFixture(onChainBalance = 10 btc, config = config) { f => + import f._ + + // Our first peer has the biggest outgoing payment volume and capacity and needs more liquidity. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.001 btc, totalAmountOut = 0.002 btc, relayFeeEarned = 10_000 sat)), + channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 3.5 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our second peer, which has a smaller capacity, has a better per-capacity profit ratio. + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.0005 btc, totalAmountOut = 0.001 btc, relayFeeEarned = 5_000 sat)), + channels = Seq(channelInfo(canSend = 0.01 btc, canReceive = 0.07 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our third peer also has a good per-capacity profit ratio, but doesn't need any liquidity. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.0005 btc, totalAmountOut = 0.001 btc, relayFeeEarned = 5_000 sat)), + channels = Seq(channelInfo(canSend = 0.04 btc, canReceive = 0.04 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + val funding = Seq( + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + register.expectMessageType[Register.ForwardNodeId[OpenChannel]], + ) + assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + // We increase small peers' capacity by 50%. + funding.find(_.nodeId == remoteNodeId2).foreach(cmd => assert(cmd.message.fundingAmount == 0.04.btc.toSatoshi)) + register.expectNoMessage(100 millis) + } + } + + test("fund channels with whitelisted peers") { + val config = defaultConfig.copy(topPeersWhitelist = Set(remoteNodeId3)) + withFixture(onChainBalance = 10 btc, config = config) { f => + import f._ + + // Our first peer isn't very profitable and doesn't need liquidity (small outgoing flow). + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 250_000 sat, totalAmountOut = 300_000 sat, relayFeeEarned = 20_000 sat)), + channels = Seq(channelInfo(canSend = 0.4 btc, canReceive = 0.5 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our second peer isn't very profitable either and doesn't need liquidity (small incoming flow). + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 300_000 sat, totalAmountOut = 280_000 sat, relayFeeEarned = 15_000 sat)), + channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 0.2 btc)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + // Our third peer has exhausted its liquidity a while ago and lost most of its capacity: but it is whitelisted, + // so we should fund a new channel with them. + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(weeklyBuckets)(PeerStats.empty), + channels = Seq(channelInfo(canSend = 150_000 sat, canReceive = 1_400_000 sat)), + latestUpdate_opt = None, + hasPendingChannel = false, + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId3) + assert(cmd.message.fundingAmount == defaultConfig.liquidity.minFundingAmount) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund channels if channel is already being created") { + withFixture(onChainBalance = 5 btc) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // If there are no pending channel, we suggest adding liquidity. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId1) + assert(cmd.message.fundingAmount > 0.01.btc) + } + + // If there is already a pending channel, we don't suggest adding liquidity. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = true))) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund channels if feerate is too high") { + val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(maxFeerate = FeeratePerByte(1 sat).perKw)) + withFixture(onChainBalance = 5 btc, config = config) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // But the feerate is too high compared to our configured threshold, so we don't do anything. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + register.expectNoMessage(100 millis) + } + } + + test("don't fund channels if on-chain balance is too low") { + val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(minOnChainBalance = 1.5 btc)) + withFixture(onChainBalance = 1.5 btc, config = config) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // But our balance is too low compared to our configured threshold, so we don't do anything. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + register.expectNoMessage(100 millis) + } + } + + test("don't update relay fees too frequently") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc) + val latestUpdate = channelUpdate(channel.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // We have an increasing outgoing flow with our peer: we should increase our routing fees. + val peerInfo = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc, relayFeeEarned = 0.00015.btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + // We increase our routing fees. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.channelId == channel.channelId) + assert(cmd.message.feeProportionalMillionths > latestUpdate.feeProportionalMillionths) + } + register.expectNoMessage(100 millis) + + // However, if our latest update was done recently, we don't update our routing fees. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo.copy(latestUpdate_opt = Some(latestUpdate.copy(timestamp = TimestampSecond.now() - Bucket.duration))))) + } + register.expectNoMessage(100 millis) + } + } + + test("close idle channels with liquidity") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + // We have several old channels where liquidity is on our side. + val c1a = channelInfo(canSend = 0.5 btc, canReceive = 0.01 btc) + val c1b = channelInfo(canSend = 0.8 btc, canReceive = 0.1 btc, isPublic = false) + val c1c = channelInfo(canSend = 0.4 btc, canReceive = 0.05 btc) + val c2a = channelInfo(canSend = 0.3 btc, canReceive = 0.1 btc) + val c2b = channelInfo(canSend = 0.2 btc, canReceive = 0.01 btc) + val c3 = channelInfo(canSend = 0.8 btc, canReceive = 0.05 btc) + + // The outgoing volume of the corresponding peers is negligible. + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 10_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 100 sat)), + channels = Seq(c1a, c1b, c1c), + latestUpdate_opt = None, + hasPendingChannel = false + ) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId2, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 5_000 sat, totalAmountIn = 8_000 sat, relayFeeEarned = 80 sat)), + channels = Seq(c2a, c2b), + latestUpdate_opt = None, + hasPendingChannel = false + ) + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId3, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 5_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 50 sat)), + channels = Seq(c3), + latestUpdate_opt = None, + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3)) + } + // We want to keep one channel with each peer, and select the one that likely has the best score for path-finding algorithms. + assert(Seq( + register.expectMessageType[Register.Forward[CMD_CLOSE]], + register.expectMessageType[Register.Forward[CMD_CLOSE]], + register.expectMessageType[Register.Forward[CMD_CLOSE]] + ).map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId, c2b.channelId)) + register.expectNoMessage(100 millis) + } + } + + test("don't close idle channels if feerate is too high") { + val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(maxFeerate = FeeratePerByte(1 sat).perKw)) + withFixture(onChainBalance = 0 btc, config = config) { f => + import f._ + + // We have several old channels where liquidity is on our side. + val c1a = channelInfo(canSend = 0.5 btc, canReceive = 0.01 btc) + val c1b = channelInfo(canSend = 0.8 btc, canReceive = 0.1 btc) + val c1c = channelInfo(canSend = 0.4 btc, canReceive = 0.05 btc) + + // The outgoing volume of the corresponding peer is negligible. + val peerInfo = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 10_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 100 sat)), + channels = Seq(c1a, c1b, c1c), + latestUpdate_opt = None, + hasPendingChannel = false + ) + + // But the feerate is too high compared to our configured threshold, so we don't close channels. + assert(TestConstants.Alice.nodeParams.currentFeeratesForFundingClosing.medium > config.liquidity.maxFeerate) + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo)) + } + register.expectNoMessage(100 millis) + } + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala new file mode 100644 index 0000000000..c4dea6f884 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala @@ -0,0 +1,339 @@ +package fr.acinq.eclair.profit + +import akka.actor.ActorRef +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} +import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, OutPoint, SatoshiLong, Script, Transaction, TxOut} +import fr.acinq.eclair.TestUtils.randomTxId +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.payment.PaymentEvent.{IncomingPayment, OutgoingPayment} +import fr.acinq.eclair.payment.relay.Relayer.RelayFees +import fr.acinq.eclair.payment.{ChannelPaymentRelayed, TrampolinePaymentRelayed} +import fr.acinq.eclair.profit.PeerStatsTracker._ +import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo} +import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} +import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiryDelta, Features, MilliSatoshiLong, RealShortChannelId, TestDatabases, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32, randomKey} +import org.scalatest.Inside.inside +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.{ByteVector, HexStringSyntax} + +import scala.concurrent.duration.DurationInt + +class PeerStatsTrackerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + private val dummyPubKey = PrivateKey(ByteVector32.One).publicKey + private val dummyAliases = ShortIdAliases(Alias(42), None) + private val dummyChannelAnn = ChannelAnnouncement(ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, Features.empty, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), dummyPubKey, dummyPubKey, dummyPubKey, dummyPubKey) + + private val localNodeId = PublicKey(hex"03bd04635f1465d9347f3d69edc51f17cdf9548847533e084cd9d153a4abb065cd") + private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e") + private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28") + private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96") + + private def commitments(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, announceChannel: Boolean = true): Commitments = { + CommitmentsSpec.makeCommitments(toLocal.toMilliSatoshi, toRemote.toMilliSatoshi, localNodeId, remoteNodeId, if (announceChannel) Some(dummyChannelAnn) else None) + } + + private def updateChannelBalance(c: Commitments, toLocal: BtcAmount, toRemote: BtcAmount): Commitments = { + val c1 = commitments(c.remoteNodeId, toLocal, toRemote, c.announceChannel) + c1.copy(channelParams = c1.channelParams.copy(channelId = c.channelId)) + } + + private def channelUpdate(capacity: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = { + val messageFlags = MessageFlags(dontForward = !announceChannel) + val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true) + ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi) + } + + private def channel(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), announceChannel: Boolean = true): DATA_NORMAL = { + val c = commitments(remoteNodeId, toLocal, toRemote, announceChannel) + val ann_opt = if (announceChannel) Some(dummyChannelAnn) else None + val update = channelUpdate(c.capacity, fees, TimestampSecond.now(), announceChannel) + DATA_NORMAL(c, dummyAliases, ann_opt, update, SpliceStatus.NoSplice, None, None, None) + } + + test("create buckets") { + // February 5th 2026 at 12h00 UTC. + val timestamp = TimestampMilli(1770292800000L) + assert(Bucket.from(timestamp) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2)) + assert(Bucket.from(timestamp - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1)) + assert(Bucket.from(timestamp - Bucket.duration) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1)) + assert(Bucket.from(timestamp - Bucket.duration - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 2)) + assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2) == Bucket(day = 5, month = 2, year = 2026, slot = 0)) + assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2 - 1.millis) == Bucket(day = 4, month = 2, year = 2026, slot = Bucket.bucketsPerDay - 1)) + } + + test("sort buckets") { + val b1 = Bucket(day = 30, month = 11, year = 2025, slot = 0) + val b2 = Bucket(day = 30, month = 11, year = 2025, slot = 7) + val b3 = Bucket(day = 30, month = 11, year = 2025, slot = 9) + val b4 = Bucket(day = 1, month = 12, year = 2025, slot = 2) + val b5 = Bucket(day = 1, month = 12, year = 2025, slot = 3) + val b6 = Bucket(day = 15, month = 12, year = 2025, slot = 5) + val b7 = Bucket(day = 1, month = 1, year = 2026, slot = 1) + assert(b1 < b2 && b2 < b3 && b3 < b4 && b4 < b5 && b5 < b6 && b6 < b7) + assert(Seq(b3, b6, b5, b1, b4, b2, b7).sorted == Seq(b1, b2, b3, b4, b5, b6, b7)) + } + + test("evaluate consumed bucket ratio") { + // February 5th 2026 at 12h00 UTC. + val timestamp = TimestampMilli(1770292800000L) + assert(Bucket.consumed(timestamp) == 0.00) + assert(Bucket.consumed(timestamp + Bucket.duration / 3) == 1.0 / 3) + assert(Bucket.consumed(timestamp + Bucket.duration / 2) == 0.5) + assert(Bucket.consumed(timestamp + Bucket.duration * 2 / 3) == 2.0 / 3) + assert(Bucket.consumed(timestamp - 10.seconds) >= 0.99) + assert(Bucket.consumed(timestamp - 10.seconds) < 1.0) + } + + test("keep track of channel balances and state") { + val now = TimestampMilli.now() + val probe = TestProbe[LatestStats]() + + // We have 4 channels with our first peer: 2 of them are active. + val c1a = DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), dummyAliases) + val c1b = channel(remoteNodeId1, toLocal = 0.15 btc, toRemote = 0.25 btc) + val c1c = channel(remoteNodeId1, toLocal = 0.07 btc, toRemote = 0.03 btc) + val c1d = DATA_SHUTDOWN(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), null, null, CloseStatus.Initiator(None)) + // We have 2 channels with our second peer: none of them are active. + val c2a = DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(remoteNodeId2, toLocal = 0.13 btc, toRemote = 0.24 btc), BlockHeight(750_000), None, null) + val c2b = DATA_NEGOTIATING_SIMPLE(commitments(remoteNodeId2, toLocal = 0.5 btc, toRemote = 0.1 btc), FeeratePerKw(2000 sat), ByteVector.empty, ByteVector.empty, Nil, Nil) + // We have 2 channels with our third peer: none of them are active. + val c3a = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.2 btc), 0 msat, 0 msat, BlockHeight(750_000), BlockHeight(750_000), DualFundingStatus.WaitingForConfirmations, None) + val c3b = DATA_CLOSING(commitments(remoteNodeId3, toLocal = 0.2 btc, toRemote = 0.2 btc), BlockHeight(750_000), ByteVector.empty, Nil, ClosingTx(InputInfo(OutPoint(randomTxId(), 2), TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey))), Transaction(2, Nil, TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey)) :: Nil, 0), None) :: Nil) + + // We initialize our actor with these existing channels. + val tracker = testKit.spawn(PeerStatsTracker(TestDatabases.inMemoryDb().audit, Seq(c1a, c1b, c1c, c1d, c2a, c2b, c3a, c3b))) + // We have relayed payments through channels that have been closed since then. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 58_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c3b.channelId, remoteNodeId3, 50_000_000 msat, now)) + )) + // We have relayed payments through active channels as well. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c2b.channelId, remoteNodeId2, 15_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now)) + )) + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + // We only have active channels with our first peer. + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1)) + // We only take into account the two active channels with our first peer. + val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId)) + assert(peer1.latestUpdate_opt.exists(u => Set(c1b.channelUpdate, c1c.channelUpdate).contains(u))) + assert(peer1.capacity == 0.5.btc.toSatoshi) + assert(peer1.hasPendingChannel) + assert(0.21.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.22.btc.toMilliSatoshi) + assert(0.27.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.28.btc.toMilliSatoshi) + } + + // Our pending channel with our second peer becomes ready. + val update2a = channelUpdate(c2a.commitments.capacity, RelayFees(500 msat, 500)) + tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c2a.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2, + announcement_opt = None, + channelUpdate = update2a, + commitments = c2a.commitments, + )) + + // Our pending channel with our third peer is aborted. + tracker.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c3a.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId3 + )) + + // We forget about our third peer, with whom we don't have pending or active channels anymore. + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + assert(!s.peers.find(_.remoteNodeId == remoteNodeId2).get.hasPendingChannel) + assert(s.peers.find(_.remoteNodeId == remoteNodeId2).get.latestUpdate_opt.contains(update2a)) + } + + // We relay a payment to our second peer. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 30_000_000 msat, now - 5.minutes)), + outgoing = Seq(OutgoingPayment(c2a.channelId, remoteNodeId2, 15_000_000 msat, now)) + )) + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2)) + val peer2 = s.peers.find(_.remoteNodeId == remoteNodeId2).get + assert(peer2.channels.map(_.channelId).toSet == Set(c2a.channelId)) + assert(peer2.latestUpdate_opt.contains(update2a)) + } + + // We update our routing fees with our first peer. + val update1b = channelUpdate(c1b.commitments.capacity, RelayFees(100 msat, 600), TimestampSecond.now() + 10.seconds) + tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c1b.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId1, + announcement_opt = None, + channelUpdate = update1b, + commitments = updateChannelBalance(c1b.commitments, toLocal = 0.3 btc, toRemote = 0.1 btc), + )) + + // We ignore previous channel updates from another channel. + val update1c = channelUpdate(c1c.commitments.capacity, RelayFees(150 msat, 400), update1b.timestamp - 10.seconds) + tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate( + channel = ActorRef.noSender, + channelId = c1c.channelId, + aliases = dummyAliases, + remoteNodeId = remoteNodeId1, + announcement_opt = None, + channelUpdate = update1c, + commitments = updateChannelBalance(c1c.commitments, toLocal = 0.05 btc, toRemote = 0.05 btc), + )) + + // Channels with our second peer are closed. + tracker.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c2a.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2 + )) + tracker.ref ! WrappedLocalChannelDown(LocalChannelDown( + channel = ActorRef.noSender, + channelId = c2b.channelId, + realScids = Nil, + aliases = dummyAliases, + remoteNodeId = remoteNodeId2 + )) + + // The only remaining channels are with our first peer, with updated balances and the latest channel update. + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1)) + val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId)) + assert(peer1.capacity == 0.5.btc.toSatoshi) + assert(0.34.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.35.btc.toMilliSatoshi) + assert(0.14.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.15.btc.toMilliSatoshi) + assert(peer1.latestUpdate_opt.contains(update1b)) + } + } + + test("keep track of peer statistics") { + val now = TimestampMilli.now() + val probe = TestProbe[LatestStats]() + val tracker = testKit.spawn(PeerStatsTracker(TestDatabases.inMemoryDb().audit, Nil)) + + // We have channels with 3 peers: + val c1a = commitments(remoteNodeId1, toLocal = 0.5 btc, toRemote = 0.3 btc) + val c1b = commitments(remoteNodeId1, toLocal = 0.4 btc, toRemote = 0.2 btc, announceChannel = false) + val c2 = commitments(remoteNodeId2, toLocal = 0.01 btc, toRemote = 0.9 btc) + val c3 = commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.1 btc) + Seq(c1a, c1b, c2, c3).foreach(c => tracker.ref ! WrappedAvailableBalanceChanged(AvailableBalanceChanged( + channel = ActorRef.noSender, + channelId = c.channelId, + aliases = dummyAliases, + commitments = c, + lastAnnouncement_opt = None, + ))) + + // We have relayed some payments with all of those peers. + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c1a.channelId, remoteNodeId1, 30_000_000 msat, now - Bucket.duration * 2)), + outgoing = Seq(OutgoingPayment(c2.channelId, remoteNodeId2, 20_000_000 msat, now - Bucket.duration * 2)) + )) + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq(IncomingPayment(c2.channelId, remoteNodeId2, 10_000_000 msat, now - Bucket.duration * 2)), + outgoing = Seq(OutgoingPayment(c3.channelId, remoteNodeId3, 9_000_000 msat, now - Bucket.duration * 2)) + )) + tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq( + IncomingPayment(c2.channelId, remoteNodeId2, 30_000_000 msat, now - Bucket.duration), + IncomingPayment(c3.channelId, remoteNodeId3, 25_000_000 msat, now - Bucket.duration), + ), + outgoing = Seq( + OutgoingPayment(c1a.channelId, remoteNodeId1, 50_000_000 msat, now - Bucket.duration), + ) + )) + tracker.ref ! WrappedPaymentRelayed(TrampolinePaymentRelayed( + paymentHash = randomBytes32(), + incoming = Seq( + IncomingPayment(c2.channelId, remoteNodeId2, 21_000_000 msat, now), + IncomingPayment(c2.channelId, remoteNodeId2, 34_000_000 msat, now), + ), + outgoing = Seq( + OutgoingPayment(c3.channelId, remoteNodeId3, 22_000_000 msat, now), + OutgoingPayment(c3.channelId, remoteNodeId3, 18_000_000 msat, now), + OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now), + ), + nextTrampolineNodeId = randomKey().publicKey, + nextTrampolineAmount = 50_000_000 msat, + )) + + // We keep track of aggregated statistics per bucket. + tracker.ref ! GetLatestStats(probe.ref) + inside(probe.expectMessageType[LatestStats]) { s => + assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2, remoteNodeId3)) + assert(s.peers.flatMap(_.stats.map(_.profit)).sum == 21_000_000.msat) + // We only have routing activity in the past 3 buckets. + s.peers.foreach(p => assert(p.stats.drop(3).forall(olderStats => olderStats == PeerStats.empty))) + // We verify that routing activity is correctly recorded in the right bucket. + val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get + assert(peer1.capacity == 1.4.btc.toSatoshi) + assert(peer1.canSend == c1a.availableBalanceForSend + c1b.availableBalanceForSend) + assert(peer1.canReceive == c1a.availableBalanceForReceive + c1b.availableBalanceForReceive) + assert(peer1.stats.head.totalAmountIn == 0.msat) + assert(peer1.stats.head.totalAmountOut == 10_000_000.msat) + assert(peer1.stats.head.relayFeeEarned == 1_000_000.msat) + assert(peer1.stats(1).totalAmountIn == 0.msat) + assert(peer1.stats(1).totalAmountOut == 50_000_000.msat) + assert(peer1.stats(1).relayFeeEarned == 5_000_000.msat) + assert(peer1.stats(2).totalAmountIn == 30_000_000.msat) + assert(peer1.stats(2).totalAmountOut == 0.msat) + assert(peer1.stats(2).relayFeeEarned == 0.msat) + assert(peer1.stats.map(_.outgoingFlow).sum == 30_000_000.msat) + val peer2 = s.peers.find(_.remoteNodeId == remoteNodeId2).get + assert(peer2.capacity == 0.91.btc.toSatoshi) + assert(peer2.canSend == c2.availableBalanceForSend) + assert(peer2.canReceive == c2.availableBalanceForReceive) + assert(peer2.stats.head.totalAmountIn == 55_000_000.msat) + assert(peer2.stats.head.totalAmountOut == 0.msat) + assert(peer2.stats.head.relayFeeEarned == 0.msat) + assert(peer2.stats(1).totalAmountIn == 30_000_000.msat) + assert(peer2.stats(1).totalAmountOut == 0.msat) + assert(peer2.stats(1).relayFeeEarned == 0.msat) + assert(peer2.stats(2).totalAmountIn == 10_000_000.msat) + assert(peer2.stats(2).totalAmountOut == 20_000_000.msat) + assert(peer2.stats(2).relayFeeEarned == 10_000_000.msat) + assert(peer2.stats.map(_.outgoingFlow).sum == -75_000_000.msat) + val peer3 = s.peers.find(_.remoteNodeId == remoteNodeId3).get + assert(peer3.capacity == 0.8.btc.toSatoshi) + assert(peer3.canSend == c3.availableBalanceForSend) + assert(peer3.canReceive == c3.availableBalanceForReceive) + assert(peer3.stats.head.totalAmountIn == 0.msat) + assert(peer3.stats.head.totalAmountOut == 40_000_000.msat) + assert(peer3.stats.head.relayFeeEarned == 4_000_000.msat) + assert(peer3.stats(1).totalAmountIn == 25_000_000.msat) + assert(peer3.stats(1).totalAmountOut == 0.msat) + assert(peer3.stats(1).relayFeeEarned == 0.msat) + assert(peer3.stats(2).totalAmountIn == 0.msat) + assert(peer3.stats(2).totalAmountOut == 9_000_000.msat) + assert(peer3.stats(2).relayFeeEarned == 1_000_000.msat) + assert(peer3.stats.map(_.outgoingFlow).sum == 24_000_000.msat) + } + } + +} From 5a59cfb96ec186894f658bdf734a70159de6ed10 Mon Sep 17 00:00:00 2001 From: t-bast Date: Wed, 25 Feb 2026 14:38:43 +0100 Subject: [PATCH 2/2] Record previous decisions and analyze them We record previous decisions to fund peers or update relay fees, and use this past data when making future decisions. --- eclair-core/src/main/resources/reference.conf | 2 + .../scala/fr/acinq/eclair/NodeParams.scala | 1 + .../fr/acinq/eclair/profit/PeerScorer.scala | 193 ++++++++++---- .../scala/fr/acinq/eclair/TestConstants.scala | 2 + .../acinq/eclair/profit/PeerScorerSpec.scala | 246 ++++++++++++++++++ 5 files changed, 392 insertions(+), 52 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index f0b5056302..92f90d3948 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -683,6 +683,8 @@ eclair { max-feerate-sat-per-byte = 5 // Rate-limit the number of funding transactions we make per day (on average). max-funding-tx-per-day = 6 + // Minimum time between funding the same peer, to evaluate whether the previous funding was effective. + funding-cooldown = 72 hours } // We can automatically update our relay fees to our top peers when necessary. relay-fees { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index a921a24869..6f49a6b77f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -729,6 +729,7 @@ object NodeParams extends Logging { maxFundingTxPerDay = config.getInt("peer-scoring.liquidity.max-funding-tx-per-day"), minOnChainBalance = config.getLong("peer-scoring.liquidity.min-on-chain-balance-satoshis").sat, maxFeerate = FeeratePerByte(config.getLong("peer-scoring.liquidity.max-feerate-sat-per-byte").sat).perKw, + fundingCooldown = FiniteDuration(config.getDuration("peer-scoring.liquidity.funding-cooldown").getSeconds, TimeUnit.SECONDS), ), relayFees = PeerScorer.RelayFeesConfig( autoUpdate = config.getBoolean("peer-scoring.relay-fees.auto-update"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala index 4a71fa5201..350b33c895 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala @@ -64,8 +64,9 @@ object PeerScorer { * @param maxFundingTxPerDay we rate-limit the number of transactions we make per day (on average). * @param minOnChainBalance we stop funding channels if our on-chain balance is below this amount. * @param maxFeerate we stop funding channels if the on-chain feerate is above this value. + * @param fundingCooldown minimum time between funding the same peer, to evaluate effectiveness. */ - case class LiquidityConfig(autoFund: Boolean, autoClose: Boolean, minFundingAmount: Satoshi, maxFundingAmount: Satoshi, maxFundingTxPerDay: Int, minOnChainBalance: Satoshi, maxFeerate: FeeratePerKw) + case class LiquidityConfig(autoFund: Boolean, autoClose: Boolean, minFundingAmount: Satoshi, maxFundingAmount: Satoshi, maxFundingTxPerDay: Int, minOnChainBalance: Satoshi, maxFeerate: FeeratePerKw, fundingCooldown: FiniteDuration) /** * @param autoUpdate if true, we will automatically update our relay fees. @@ -76,10 +77,29 @@ object PeerScorer { */ case class RelayFeesConfig(autoUpdate: Boolean, minRelayFees: RelayFees, maxRelayFees: RelayFees, dailyPaymentVolumeThreshold: Satoshi, dailyPaymentVolumeThresholdPercent: Double) - private case class LiquidityDecision(peer: PeerInfo, fundingAmount: Satoshi) { + private case class FundingProposal(peer: PeerInfo, fundingAmount: Satoshi) { val remoteNodeId: PublicKey = peer.remoteNodeId } + private case class FundingDecision(fundingAmount: Satoshi, dailyVolumeOutAtFunding: MilliSatoshi, timestamp: TimestampMilli) + + // @formatter:off + private sealed trait FeeDirection + private case object FeeIncrease extends FeeDirection { override def toString: String = "increase" } + private case object FeeDecrease extends FeeDirection { override def toString: String = "decrease" } + // @formatter:on + + private case class FeeChangeDecision(direction: FeeDirection, previousFee: RelayFees, newFee: RelayFees, dailyVolumeOutAtChange: MilliSatoshi) + + private case class DecisionHistory(funding: Map[PublicKey, FundingDecision], feeChanges: Map[PublicKey, FeeChangeDecision]) { + // @formatter:off + def addFunding(nodeId: PublicKey, record: FundingDecision): DecisionHistory = copy(funding = funding.updated(nodeId, record)) + def addFeeChanges(records: Map[PublicKey, FeeChangeDecision]): DecisionHistory = copy(feeChanges = feeChanges.concat(records)) + def revertFeeChanges(remoteNodeIds: Set[PublicKey]): DecisionHistory = copy(feeChanges = feeChanges -- remoteNodeIds) + def cleanup(now: TimestampMilli, maxAge: FiniteDuration): DecisionHistory = copy(funding = funding.filter { case (_, r) => now - r.timestamp < maxAge }) + // @formatter:on + } + private def rollingDailyStats(p: PeerInfo): Seq[Seq[PeerStats]] = { (0 until (p.stats.size - Bucket.bucketsPerDay)).map(i => p.stats.slice(i, i + Bucket.bucketsPerDay)) } @@ -94,7 +114,7 @@ object PeerScorer { Behaviors.setup { context => Behaviors.withTimers { timers => timers.startTimerWithFixedDelay(ScorePeers(None), nodeParams.peerScoringConfig.scoringFrequency) - new PeerScorer(nodeParams, wallet, statsTracker, register, context).run() + new PeerScorer(nodeParams, wallet, statsTracker, register, context).run(DecisionHistory(Map.empty, Map.empty)) } } } @@ -109,21 +129,21 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, private val log = context.log private val config = nodeParams.peerScoringConfig - private def run(): Behavior[Command] = { + private def run(history: DecisionHistory): Behavior[Command] = { Behaviors.receiveMessagePartial { case ScorePeers(replyTo_opt) => statsTracker ! PeerStatsTracker.GetLatestStats(context.messageAdapter[PeerStatsTracker.LatestStats](e => WrappedLatestStats(e.peers))) - waitForStats(replyTo_opt) + waitForStats(replyTo_opt, history) } } - private def waitForStats(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]]): Behavior[Command] = { + private def waitForStats(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], history: DecisionHistory): Behavior[Command] = { Behaviors.receiveMessagePartial { - case WrappedLatestStats(peers) => scorePeers(replyTo_opt, peers) + case WrappedLatestStats(peers) => scorePeers(replyTo_opt, peers, history) } } - private def scorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], peers: Seq[PeerInfo]): Behavior[Command] = { + private def scorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], peers: Seq[PeerInfo], history: DecisionHistory): Behavior[Command] = { log.info("scoring {} peers", peers.size) val dailyProfit = peers.map(_.stats.take(Bucket.bucketsPerDay).map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc val weeklyProfit = peers.map(_.stats.map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc @@ -162,7 +182,7 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, .sortBy(_._2)(Ordering[MilliSatoshi].reverse) .take(config.topPeersCount) // We fund a multiple of our daily outgoing flow, to ensure we have enough liquidity for a few days. - .map { case (p, _) => LiquidityDecision(p, (bestDailyOutgoingFlow(p) * 4).truncateToSatoshi) } + .map { case (p, _) => FundingProposal(p, (bestDailyOutgoingFlow(p) * 4).truncateToSatoshi) } .filter(_.fundingAmount > 0.sat) if (bestPeersThatNeedLiquidity.nonEmpty) { log.debug("top {} peers that need liquidity:", bestPeersThatNeedLiquidity.size) @@ -180,7 +200,7 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, .filter { case (p, _) => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2 } .map(_._1) // We'd like to increase their capacity by 50%. - .map(p => LiquidityDecision(p, p.capacity * 0.5)) + .map(p => FundingProposal(p, p.capacity * 0.5)) if (goodSmallPeers.nonEmpty) { log.debug("we've identified {} smaller peers that perform well relative to their capacity", goodSmallPeers.size) printDailyStats(goodSmallPeers.map(_.peer)) @@ -199,7 +219,7 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, ) .distinctBy(_.remoteNodeId) // We'd like to increase their capacity by 25%. - .map(p => LiquidityDecision(p, p.capacity * 0.25)) + .map(p => FundingProposal(p, p.capacity * 0.25)) // Since we're not yet reading past events from the DB, we need to wait until we have collected enough data before // taking some actions such as opening or closing channels or updating relay fees. @@ -207,11 +227,11 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, val hasPastData = bestPeersByVolume.exists(_.stats.drop(Bucket.bucketsPerDay).exists(_ != PeerStats.empty)) if (hasPastData && replyTo_opt.isEmpty) { closeChannelsIfNeeded(peers) - updateRelayFeesIfNeeded(peers) - fundChannelsIfNeeded(bestPeersThatNeedLiquidity, goodSmallPeers, peersToRevive) + val history1 = updateRelayFeesIfNeeded(peers, history) + fundChannelsIfNeeded(bestPeersThatNeedLiquidity, goodSmallPeers, peersToRevive, history1) } else { replyTo_opt.foreach(_ ! bestPeersThatNeedLiquidity.map(_.peer)) - run() + run(history) } } @@ -256,38 +276,35 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, }) } - private def updateRelayFeesIfNeeded(peers: Seq[PeerInfo]): Unit = { + private def updateRelayFeesIfNeeded(peers: Seq[PeerInfo], history: DecisionHistory): DecisionHistory = { // We configure *daily* absolute and proportional payment volume targets. We look at events from the current period // and the previous period, so we need to get the right ratio to convert those daily amounts. val now = TimestampMilli.now() val lastTwoBucketsRatio = 1.0 + Bucket.consumed(now) val lastTwoBucketsDailyRatio = (Bucket.duration * lastTwoBucketsRatio).toSeconds.toDouble / (24 * 3600) // We increase fees of channels that are performing better than we expected. - log.debug("we should update our relay fees with the following peers:") - log.debug("| node_id | volume_variation | decision | current_fee | next_fee |") - log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") - peers + val feeIncreases = peers // We select peers that have exceeded our payment volume target in the past two periods. .filter(p => p.stats.take(2).map(_.totalAmountOut).sum >= Seq(config.relayFees.dailyPaymentVolumeThreshold * lastTwoBucketsDailyRatio, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent * lastTwoBucketsDailyRatio).min) // And that have an increasing payment volume compared to the period before that. .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat) .filter(p => (p.stats.take(2).map(_.totalAmountOut).sum / lastTwoBucketsRatio) > p.stats.slice(2, 3).map(_.totalAmountOut).sum * 1.1) - .foreach(p => { + .flatMap(p => { p.latestUpdate_opt match { // And for which we haven't updated our relay fees recently already. case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds => - val next = u.relayFees.feeProportionalMillionths + 500 - val volumeVariation = p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio) - log.debug(f"| ${p.remoteNodeId} | $volumeVariation%.2f | increase | ${u.feeProportionalMillionths}%11d | $next%8d |") - if (config.relayFees.autoUpdate && next <= config.relayFees.maxRelayFees.feeProportionalMillionths) { - val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, u.relayFees.feeBase, next) - p.channels.foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)) + val next = u.relayFees.copy(feeProportionalMillionths = u.feeProportionalMillionths + 500) + if (next.feeBase <= config.relayFees.maxRelayFees.feeBase && next.feeProportionalMillionths <= config.relayFees.maxRelayFees.feeProportionalMillionths) { + val dailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + Some(p.remoteNodeId -> FeeChangeDecision(FeeIncrease, u.relayFees, next, dailyVolume)) + } else { + None } - case _ => () + case _ => None } - }) + }).toMap // We decrease fees of channels that aren't performing well. - peers + val feeDecreases = peers // We select peers that have a recent 15% decrease in outgoing payment volume. .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat) .filter(p => p.stats.take(2).map(_.totalAmountOut).sum <= p.stats.slice(2, 3).map(_.totalAmountOut).sum * 0.85) @@ -295,24 +312,78 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum <= p.stats.slice(3, 4).map(_.totalAmountOut).sum) // And that have enough liquidity to relay outgoing payments. .filter(p => p.canSend >= Seq(config.relayFees.dailyPaymentVolumeThreshold, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent).min) - .foreach(p => { + .flatMap(p => { p.latestUpdate_opt match { // And for which we haven't updated our relay fees recently already. case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds => - val next = u.relayFees.feeProportionalMillionths - 500 - val volumeVariation = p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio) - log.debug(f"| ${p.remoteNodeId} | $volumeVariation%.2f | decrease | ${u.feeProportionalMillionths}%11d | $next%8d |") - if (config.relayFees.autoUpdate && next >= config.relayFees.minRelayFees.feeProportionalMillionths) { - val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, u.relayFees.feeBase, next) - p.channels.foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)) + val next = u.relayFees.copy(feeProportionalMillionths = u.feeProportionalMillionths - 500) + if (next.feeBase >= config.relayFees.minRelayFees.feeBase && next.feeProportionalMillionths >= config.relayFees.minRelayFees.feeProportionalMillionths) { + val dailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + Some(p.remoteNodeId -> FeeChangeDecision(FeeDecrease, u.relayFees, next, dailyVolume)) + } else { + None } - case _ => () + case _ => None } - }) - log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + }).toMap + // We revert fee changes that had a negative impact on volume. + val feeReverts = peers + .filterNot(p => feeIncreases.contains(p.remoteNodeId) || feeDecreases.contains(p.remoteNodeId)) + .flatMap { p => + (history.feeChanges.get(p.remoteNodeId), p.latestUpdate_opt) match { + case (Some(record), Some(u)) => + // Confirm our change is still in effect. + val updateMatchesRecord = u.feeProportionalMillionths == record.newFee.feeProportionalMillionths + // We expect around 6h-24h after the update to really see its effect on path-finding: after 24h, too many + // other parameters may interfere with our evaluation. + val withinEvaluationWindow = u.timestamp >= now.toTimestampSecond - (24 hours).toSeconds + // We don't want to update fees too often. + val notUpdatedRecently = u.timestamp <= now.toTimestampSecond - (Bucket.duration * 2).toSeconds + // We revert both fee increases and fee decreases: if volume didn't increase after a fee decrease, we'd + // rather collect our previous (bigger) relay fee. + val currentDailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + val shouldRevert = record.direction match { + case FeeIncrease => currentDailyVolume < record.dailyVolumeOutAtChange * 0.8 + case FeeDecrease => currentDailyVolume < record.dailyVolumeOutAtChange * 0.9 + } + if (updateMatchesRecord && withinEvaluationWindow && notUpdatedRecently && shouldRevert) { + val decision = record.direction match { + case PeerScorer.FeeIncrease => FeeChangeDecision(FeeDecrease, u.relayFees, u.relayFees.copy(feeProportionalMillionths = u.relayFees.feeProportionalMillionths - 500), currentDailyVolume) + case PeerScorer.FeeDecrease => FeeChangeDecision(FeeIncrease, u.relayFees, u.relayFees.copy(feeProportionalMillionths = u.relayFees.feeProportionalMillionths + 500), currentDailyVolume) + } + Some(p.remoteNodeId -> decision) + } else { + None + } + case _ => None + } + }.toMap + // We print the results to help debugging. + if (feeIncreases.nonEmpty || feeDecreases.nonEmpty || feeReverts.nonEmpty) { + log.debug("we should update our relay fees with the following peers:") + log.debug("| node_id | volume_variation | decision | current_fee | next_fee |") + log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + (feeIncreases.toSeq ++ feeDecreases.toSeq ++ feeReverts.toSeq).foreach { case (remoteNodeId, decision) => + val volumeVariation = peers.find(_.remoteNodeId == remoteNodeId) match { + case Some(p) if p.stats.slice(2, 3).map(_.totalAmountOut).sum != 0.msat => p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio) + case _ => 0.0 + } + log.debug(f"| $remoteNodeId | $volumeVariation%.2f | ${decision.direction} | ${decision.previousFee.feeProportionalMillionths}%11d | ${decision.newFee.feeProportionalMillionths}%8d |") + } + log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|") + } + if (config.relayFees.autoUpdate) { + (feeIncreases.toSeq ++ feeDecreases.toSeq ++ feeReverts.toSeq).foreach { case (remoteNodeId, decision) => + val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, decision.newFee.feeBase, decision.newFee.feeProportionalMillionths) + peers.find(_.remoteNodeId == remoteNodeId).map(_.channels).getOrElse(Nil).foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)) + } + } + // Note that in order to avoid oscillating between reverts (reverting a revert), we remove the previous records when + // reverting a change: this way, the normal algorithm resumes during the next run. + history.addFeeChanges(feeIncreases ++ feeDecreases).revertFeeChanges(feeReverts.keySet) } - private def fundChannelsIfNeeded(bestPeers: Seq[LiquidityDecision], smallPeers: Seq[LiquidityDecision], toRevive: Seq[LiquidityDecision]): Behavior[Command] = { + private def fundChannelsIfNeeded(bestPeers: Seq[FundingProposal], smallPeers: Seq[FundingProposal], toRevive: Seq[FundingProposal], history: DecisionHistory): Behavior[Command] = { // We don't want to fund peers every time our scoring algorithm runs, otherwise we may create too many on-chain // transactions. We draw from a random distribution to ensure that on average: // - we follow our rate-limit for funding best peers and fund at most 3 at a time @@ -337,13 +408,13 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, } if (bestPeers.isEmpty && smallPeers.isEmpty && toRevive.isEmpty) { log.info("we haven't identified peers that require liquidity yet") - run() + run(history) } else if (toFund.isEmpty) { log.info("we skip funding peers because of per-day rate-limits: increase eclair.peer-scoring.liquidity.max-funding-tx-per-day to fund more often") - run() + run(history) } else if (config.liquidity.maxFeerate < nodeParams.currentFeeratesForFundingClosing.medium) { log.info("we skip funding peers because current feerate is too high ({} < {}): increase eclair.peer-scoring.liquidity.max-feerate-sat-per-byte to start funding again", config.liquidity.maxFeerate, nodeParams.currentFeeratesForFundingClosing.medium) - run() + run(history) } else { context.pipeToSelf(wallet.onChainBalance()) { case Success(b) => OnChainBalance(b.confirmed, b.unconfirmed) @@ -352,29 +423,47 @@ private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, Behaviors.receiveMessagePartial { case WalletError(e) => log.warn("cannot get on-chain balance: {}", e.getMessage) - run() + run(history) case OnChainBalance(confirmed, unconfirmed) => - if (confirmed <= config.liquidity.minOnChainBalance) { + val now = TimestampMilli.now() + val history1 = if (confirmed <= config.liquidity.minOnChainBalance) { log.info("we don't have enough on-chain balance to fund new channels (confirmed={}, unconfirmed={})", confirmed.toMilliBtc, unconfirmed.toMilliBtc) + history } else { toFund // We don't fund peers that are already being funded. - .filterNot(_.peer.hasPendingChannel) + .filterNot(p => p.peer.hasPendingChannel) + // We don't fund peers that were recently funded, unless volume improved since the last funding. + .filter { f => + history.funding.get(f.remoteNodeId) match { + case Some(record) if now - record.timestamp < config.liquidity.fundingCooldown => + val currentDailyVolume = f.peer.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + if (currentDailyVolume <= record.dailyVolumeOutAtFunding * 1.1) { + log.info("skipping funding for remote_node_id={}: last funded {} ago, volume has not improved (was={}, now={})", f.remoteNodeId, now - record.timestamp, record.dailyVolumeOutAtFunding.truncateToSatoshi.toMilliBtc, currentDailyVolume.truncateToSatoshi.toMilliBtc) + false + } else { + log.info("re-funding remote_node_id={}: volume improved since last funding (was={}, now={})", f.remoteNodeId, record.dailyVolumeOutAtFunding.truncateToSatoshi.toMilliBtc, currentDailyVolume.truncateToSatoshi.toMilliBtc) + true + } + case _ => true + } + } // And we apply our configured funding limits to the liquidity suggestions. .map(f => f.copy(fundingAmount = f.fundingAmount.max(config.liquidity.minFundingAmount).min(config.liquidity.maxFundingAmount))) - .foldLeft(confirmed - config.liquidity.minOnChainBalance) { - case (available, f) if available < f.fundingAmount * 0.5 => available - case (available, f) => + .foldLeft((confirmed - config.liquidity.minOnChainBalance, history)) { + case ((available, history), f) if available < f.fundingAmount * 0.5 => (available, history) + case ((available, history), f) => val fundingAmount = f.fundingAmount.min(available) log.info("funding channel with remote_node_id={} (funding_amount={})", f.remoteNodeId, fundingAmount.toMilliBtc) // TODO: when do we want to create a private channel? Maybe if we already have a bigger public channel? val channelFlags = ChannelFlags(announceChannel = true) val cmd = OpenChannel(f.remoteNodeId, fundingAmount, None, None, None, None, None, Some(channelFlags), Some(Timeout(60 seconds))) register ! Register.ForwardNodeId(context.system.ignoreRef, f.remoteNodeId, cmd) - available - fundingAmount - } + val dailyVolume = f.peer.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum + (available - fundingAmount, history.addFunding(f.remoteNodeId, FundingDecision(fundingAmount, dailyVolume, now))) + }._2 } - run() + run(history1.cleanup(now, 7 days)) } } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index b599f6d2f4..226f28dedc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -275,6 +275,7 @@ object TestConstants { minOnChainBalance = 5_000_000 sat, // 0.05 BTC maxFeerate = FeeratePerByte(100 sat).perKw, maxFundingTxPerDay = 100, + fundingCooldown = 72 hours, ), relayFees = PeerScorer.RelayFeesConfig( autoUpdate = true, @@ -489,6 +490,7 @@ object TestConstants { minOnChainBalance = 5_000_000 sat, // 0.05 BTC maxFeerate = FeeratePerByte(100 sat).perKw, maxFundingTxPerDay = 100, + fundingCooldown = 72 hours, ), relayFees = PeerScorer.RelayFeesConfig( autoUpdate = true, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala index 8230733aa5..d043c9b5c6 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala @@ -44,6 +44,7 @@ class PeerScorerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appli minOnChainBalance = 5_000_000 sat, // 0.05 BTC maxFeerate = FeeratePerByte(100 sat).perKw, maxFundingTxPerDay = 100, + fundingCooldown = 72 hours, ), relayFees = PeerScorer.RelayFeesConfig( autoUpdate = true, @@ -510,4 +511,249 @@ class PeerScorerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appli } } + test("don't fund the same peer within cooldown period") { + withFixture(onChainBalance = 10 btc) { f => + import f._ + + // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity. + val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc)) + val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc) + + // First scoring cycle: we fund the peer. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId1) + } + register.expectNoMessage(100 millis) + + // Second scoring cycle with the same volume: we don't fund again because the cooldown is active and volume hasn't improved. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false))) + } + register.expectNoMessage(100 millis) + + // Third scoring cycle: volume improved by more than 10%, so we fund again despite cooldown. + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + val improvedStats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1.2 btc, relayFeeEarned = 0.012 btc)) + msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, improvedStats, Seq(channel), None, hasPendingChannel = false))) + } + inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd => + assert(cmd.nodeId == remoteNodeId1) + } + register.expectNoMessage(100 millis) + } + } + + test("revert fee increase when volume drops") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc) + val initialFee = RelayFees(250 msat, 1000) + val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // First cycle: increasing volume triggers a fee increase. + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00015.btc * bucketRatio), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1500) + } + register.expectNoMessage(100 millis) + + // Second cycle: the fee increase is now in effect (channel update reflects new fee), but volume has dropped >20%. + // The update timestamp is within the evaluation window (6h-24h ago). + val newUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + // Volume has dropped significantly (>20% of what it was when we increased fees). + peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(newUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo2)) + } + // The fee should be reverted to the original value. + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == initialFee.feeProportionalMillionths) + } + register.expectNoMessage(100 millis) + } + } + + test("revert fee decrease when volume doesn't recover") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.5 btc, canReceive = 0.5 btc) + val initialFee = RelayFees(250 msat, 2000) + val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // First cycle: decreasing volume triggers a fee decrease. + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.003.btc * bucketRatio, relayFeeEarned = 0.00003.btc * bucketRatio), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00003 btc), + peerStats(totalAmountOut = 0.01 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.00012 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.00012 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1500) + } + register.expectNoMessage(100 millis) + + // Second cycle: the fee decrease is in effect but volume hasn't recovered (below 90% of what it was). + val newUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + // Volume is still very low (well below 90% of dailyVolumeOutAtChange). + peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(newUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo2)) + } + // The fee should be reverted to the original value. + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == initialFee.feeProportionalMillionths) + } + register.expectNoMessage(100 millis) + } + } + + test("fee reverts should not oscillate") { + withFixture(onChainBalance = 0 btc) { f => + import f._ + + val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc) + val initialFee = RelayFees(250 msat, 1000) + val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3) + + // Cycle 1: increasing volume triggers a fee increase (1000 -> 1500). + val bucketRatio = Bucket.consumed(TimestampMilli.now()) + val peerInfo1 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00015.btc * bucketRatio), + peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc), + peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc), + peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(latestUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo1)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1500) + } + register.expectNoMessage(100 millis) + + // Cycle 2: volume dropped >20% after the increase: we revert back to 1000. + val increasedFeeUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo2 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(increasedFeeUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo2)) + } + inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd => + assert(cmd.message.feeProportionalMillionths == 1000) // reverted + } + register.expectNoMessage(100 millis) + + // Cycle 3: the revert is now in effect, but volume continues to decline. The revert recorded a FeeDecrease in + // the history (from 1500 back to 1000). If the code treats this revert as a regular fee decrease decision, the + // revert logic would trigger again (volume < 90% of what it was at revert time) and flip the fee back to 1500, + // creating an oscillation: 1000 -> 1500 -> 1000 -> 1500 -> ... + // A revert should be final and not subject to further revert evaluation. + val revertedFeeUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1000), timestamp = TimestampSecond.now() - Bucket.duration * 3) + val peerInfo3 = PeerInfo( + remoteNodeId = remoteNodeId1, + stats = Seq( + peerStats(totalAmountOut = 0.0005.btc * bucketRatio, relayFeeEarned = 0.000005.btc * bucketRatio), + peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc), + peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc), + peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc), + ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc)), + channels = Seq(channel), + latestUpdate_opt = Some(revertedFeeUpdate), + hasPendingChannel = false + ) + + scorer ! ScorePeers(None) + inside(tracker.expectMessageType[GetLatestStats]) { msg => + msg.replyTo ! LatestStats(Seq(peerInfo3)) + } + // The fee should NOT change again: the revert was a correction, not a new decision to evaluate. + register.expectNoMessage(100 millis) + } + } + }