diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf
index 2e456cfb5d..92f90d3948 100644
--- a/eclair-core/src/main/resources/reference.conf
+++ b/eclair-core/src/main/resources/reference.conf
@@ -656,6 +656,53 @@ eclair {
cleanup-frequency = 1 day
}
+ // We keep track of peer statistics to find the most profitable peers and fund channels with them to optimize our
+ // payment volume and fees earned.
+ peer-scoring {
+ // Set this to true if you want to start collecting data to score peers.
+ enabled = false
+ // Frequency at which we run our peer scoring algorithm.
+ frequency = 1 hour
+ // Maximum number of peers to select as candidates for liquidity and relay fee updates.
+ top-peers-count = 10
+ // A list of node_ids with whom we will try to maintain liquidity.
+ top-peers-whitelist = []
+ // We can automatically allocate liquidity to our top peers when necessary.
+ liquidity {
+ // If true, we will automatically fund channels.
+ auto-fund = false
+ // If true, we will automatically close unused channels to reclaim liquidity.
+ auto-close = false
+ // We only fund channels if at least this amount is necessary.
+ min-funding-amount-satoshis = 1000000 // 0.01 btc
+ // We never fund channels with more than this amount.
+ max-funding-amount-satoshis = 50000000 // 0.5 btc
+ // We stop funding channels if our on-chain balance is below this amount.
+ min-on-chain-balance-satoshis = 50000000 // 0.5 btc
+ // We stop funding channels if the on-chain feerate is above this value.
+ max-feerate-sat-per-byte = 5
+ // Rate-limit the number of funding transactions we make per day (on average).
+ max-funding-tx-per-day = 6
+ // Minimum time between funding the same peer, to evaluate whether the previous funding was effective.
+ funding-cooldown = 72 hours
+ }
+ // We can automatically update our relay fees to our top peers when necessary.
+ relay-fees {
+ // If true, we will automatically update our relay fees based on variations in outgoing payment volume.
+ auto-update = false
+ // We will not lower our fees below these values.
+ min-fee-base-msat = 1
+ min-fee-proportional-millionths = 500
+ // We will not increase our fees above these values.
+ max-fee-base-msat = 10000
+ max-fee-proportional-millionths = 5000
+ // We only increase fees if the daily outgoing payment volume exceeds this threshold or daily-payment-volume-threshold-percent.
+ daily-payment-volume-threshold-satoshis = 10000000 // 0.1 btc
+ // We only increase fees if the daily outgoing payment volume exceeds this percentage of our peer capacity or daily-payment-volume-threshold.
+ daily-payment-volume-threshold-percent = 0.05
+ }
+ }
+
offers {
// Minimum length of an offer blinded path when hiding our real node id
message-path-min-length = 2
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala b/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala
index 100829ce83..b82198759c 100644
--- a/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/MilliSatoshi.scala
@@ -35,6 +35,7 @@ case class MilliSatoshi(private val underlying: Long) extends Ordered[MilliSatos
def *(m: Long) = MilliSatoshi(underlying * m)
def *(m: Double) = MilliSatoshi((underlying * m).toLong)
def /(d: Long) = MilliSatoshi(underlying / d)
+ def /(d: Double) = MilliSatoshi((underlying / d).toLong)
def unary_- = MilliSatoshi(-underlying)
override def compare(other: MilliSatoshi): Int = underlying.compareTo(other.underlying)
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
index be25030aec..6f49a6b77f 100644
--- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
@@ -33,6 +33,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.offer.OffersConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
+import fr.acinq.eclair.profit.PeerScorer
import fr.acinq.eclair.reputation.Reputation
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.HeuristicsConstants
@@ -95,6 +96,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig,
onTheFlyFundingConfig: OnTheFlyFunding.Config,
peerStorageConfig: PeerStorageConfig,
+ peerScoringConfig: PeerScorer.Config,
offersConfig: OffersConfig) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
@@ -714,6 +716,35 @@ object NodeParams extends Logging {
removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS),
cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS),
),
+ peerScoringConfig = PeerScorer.Config(
+ enabled = config.getBoolean("peer-scoring.enabled"),
+ scoringFrequency = FiniteDuration(config.getDuration("peer-scoring.frequency").getSeconds, TimeUnit.SECONDS),
+ topPeersCount = config.getInt("peer-scoring.top-peers-count"),
+ topPeersWhitelist = config.getStringList("peer-scoring.top-peers-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet,
+ liquidity = PeerScorer.LiquidityConfig(
+ autoFund = config.getBoolean("peer-scoring.liquidity.auto-fund"),
+ autoClose = config.getBoolean("peer-scoring.liquidity.auto-close"),
+ minFundingAmount = config.getLong("peer-scoring.liquidity.min-funding-amount-satoshis").sat,
+ maxFundingAmount = config.getLong("peer-scoring.liquidity.max-funding-amount-satoshis").sat,
+ maxFundingTxPerDay = config.getInt("peer-scoring.liquidity.max-funding-tx-per-day"),
+ minOnChainBalance = config.getLong("peer-scoring.liquidity.min-on-chain-balance-satoshis").sat,
+ maxFeerate = FeeratePerByte(config.getLong("peer-scoring.liquidity.max-feerate-sat-per-byte").sat).perKw,
+ fundingCooldown = FiniteDuration(config.getDuration("peer-scoring.liquidity.funding-cooldown").getSeconds, TimeUnit.SECONDS),
+ ),
+ relayFees = PeerScorer.RelayFeesConfig(
+ autoUpdate = config.getBoolean("peer-scoring.relay-fees.auto-update"),
+ minRelayFees = RelayFees(
+ feeBase = config.getLong("peer-scoring.relay-fees.min-fee-base-msat").msat,
+ feeProportionalMillionths = config.getLong("peer-scoring.relay-fees.min-fee-proportional-millionths"),
+ ),
+ maxRelayFees = RelayFees(
+ feeBase = config.getLong("peer-scoring.relay-fees.max-fee-base-msat").msat,
+ feeProportionalMillionths = config.getLong("peer-scoring.relay-fees.max-fee-proportional-millionths"),
+ ),
+ dailyPaymentVolumeThreshold = config.getLong("peer-scoring.relay-fees.daily-payment-volume-threshold-satoshis").sat,
+ dailyPaymentVolumeThresholdPercent = config.getDouble("peer-scoring.relay-fees.daily-payment-volume-threshold-percent"),
+ )
+ ),
offersConfig = OffersConfig(
messagePathMinLength = config.getInt("offers.message-path-min-length"),
paymentPathCount = config.getInt("offers.payment-path-count"),
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
index 6d9bc2de5e..e3729e7768 100644
--- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
@@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager}
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
+import fr.acinq.eclair.profit.{PeerScorer, PeerStatsTracker}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
@@ -400,8 +401,11 @@ class Setup(val datadir: File,
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))
balanceActor = system.spawn(BalanceActor(bitcoinClient, nodeParams.channelConf.minDepth, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")
-
postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard, router.toTyped, register, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
+ _ = if (nodeParams.peerScoringConfig.enabled) {
+ val statsTracker = system.spawn(Behaviors.supervise(PeerStatsTracker(nodeParams.db.audit, channels)).onFailure(typed.SupervisorStrategy.restart), name = "peer-stats-tracker")
+ system.spawn(Behaviors.supervise(PeerScorer(nodeParams, bitcoinClient, statsTracker, register)).onFailure(typed.SupervisorStrategy.restart), name = "peer-scorer")
+ }
kit = Kit(
nodeParams = nodeParams,
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala
index 1d6a1b4478..9629fa09bd 100644
--- a/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala
@@ -46,6 +46,7 @@ case class TimestampMilli(private val underlying: Long) extends Ordered[Timestam
require(underlying >= 0 && underlying <= 253402300799L * 1000, "invalid timestamp value")
// @formatter:off
def toLong: Long = underlying
+ def toTimestampSecond: TimestampSecond = TimestampSecond(underlying / 1000)
def toSqlTimestamp: sql.Timestamp = sql.Timestamp.from(Instant.ofEpochMilli(underlying))
override def toString: String = s"$underlying unixms"
override def compare(that: TimestampMilli): Int = underlying.compareTo(that.underlying)
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala
new file mode 100644
index 0000000000..350b33c895
--- /dev/null
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerScorer.scala
@@ -0,0 +1,471 @@
+/*
+ * Copyright 2026 ACINQ SAS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package fr.acinq.eclair.profit
+
+import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
+import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.{ActorRef => UntypedActorRef}
+import akka.util.Timeout
+import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
+import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
+import fr.acinq.eclair.blockchain.OnChainBalanceChecker
+import fr.acinq.eclair.blockchain.fee.FeeratePerKw
+import fr.acinq.eclair.channel.{CMD_CLOSE, CMD_UPDATE_RELAY_FEE, ChannelFlags, Register}
+import fr.acinq.eclair.io.Peer.OpenChannel
+import fr.acinq.eclair.payment.relay.Relayer.RelayFees
+import fr.acinq.eclair.profit.PeerStatsTracker._
+import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, NodeParams, TimestampMilli}
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.util.{Failure, Random, Success}
+
+/**
+ * Created by t-bast on 30/01/2026.
+ */
+
+object PeerScorer {
+
+ // @formatter:off
+ sealed trait Command
+ case class ScorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]]) extends Command
+ private case class WrappedLatestStats(peers: Seq[PeerInfo]) extends Command
+ private case class OnChainBalance(confirmed: Satoshi, unconfirmed: Satoshi) extends Command
+ private case class WalletError(e: Throwable) extends Command
+ // @formatter:on
+
+ /**
+ * @param enabled the [[PeerStatsTracker]] actor should only be created when [[enabled]] is true.
+ * @param scoringFrequency frequency at which we run our peer scoring algorithm.
+ * @param topPeersCount maximum number of peers to select as candidates for liquidity and relay fee updates.
+ * @param topPeersWhitelist a whitelist of peers that the node operator wants to prioritize.
+ */
+ case class Config(enabled: Boolean, scoringFrequency: FiniteDuration, topPeersCount: Int, topPeersWhitelist: Set[PublicKey], liquidity: LiquidityConfig, relayFees: RelayFeesConfig)
+
+ /**
+ * @param autoFund if true, we will automatically fund channels.
+ * @param autoClose if true, we will automatically close unused channels to reclaim liquidity.
+ * @param minFundingAmount we always fund channels with at least this amount.
+ * @param maxFundingAmount we never fund channels with more than this amount.
+ * @param maxFundingTxPerDay we rate-limit the number of transactions we make per day (on average).
+ * @param minOnChainBalance we stop funding channels if our on-chain balance is below this amount.
+ * @param maxFeerate we stop funding channels if the on-chain feerate is above this value.
+ * @param fundingCooldown minimum time between funding the same peer, to evaluate effectiveness.
+ */
+ case class LiquidityConfig(autoFund: Boolean, autoClose: Boolean, minFundingAmount: Satoshi, maxFundingAmount: Satoshi, maxFundingTxPerDay: Int, minOnChainBalance: Satoshi, maxFeerate: FeeratePerKw, fundingCooldown: FiniteDuration)
+
+ /**
+ * @param autoUpdate if true, we will automatically update our relay fees.
+ * @param minRelayFees we will not lower our relay fees below this value.
+ * @param maxRelayFees we will not raise our relay fees above this value.
+ * @param dailyPaymentVolumeThreshold we only increase fees if the daily outgoing payment volume exceeds this threshold or [[dailyPaymentVolumeThresholdPercent]].
+ * @param dailyPaymentVolumeThresholdPercent we only increase fees if the daily outgoing payment volume exceeds this percentage of our peer capacity or [[dailyPaymentVolumeThreshold]].
+ */
+ case class RelayFeesConfig(autoUpdate: Boolean, minRelayFees: RelayFees, maxRelayFees: RelayFees, dailyPaymentVolumeThreshold: Satoshi, dailyPaymentVolumeThresholdPercent: Double)
+
+ private case class FundingProposal(peer: PeerInfo, fundingAmount: Satoshi) {
+ val remoteNodeId: PublicKey = peer.remoteNodeId
+ }
+
+ private case class FundingDecision(fundingAmount: Satoshi, dailyVolumeOutAtFunding: MilliSatoshi, timestamp: TimestampMilli)
+
+ // @formatter:off
+ private sealed trait FeeDirection
+ private case object FeeIncrease extends FeeDirection { override def toString: String = "increase" }
+ private case object FeeDecrease extends FeeDirection { override def toString: String = "decrease" }
+ // @formatter:on
+
+ private case class FeeChangeDecision(direction: FeeDirection, previousFee: RelayFees, newFee: RelayFees, dailyVolumeOutAtChange: MilliSatoshi)
+
+ private case class DecisionHistory(funding: Map[PublicKey, FundingDecision], feeChanges: Map[PublicKey, FeeChangeDecision]) {
+ // @formatter:off
+ def addFunding(nodeId: PublicKey, record: FundingDecision): DecisionHistory = copy(funding = funding.updated(nodeId, record))
+ def addFeeChanges(records: Map[PublicKey, FeeChangeDecision]): DecisionHistory = copy(feeChanges = feeChanges.concat(records))
+ def revertFeeChanges(remoteNodeIds: Set[PublicKey]): DecisionHistory = copy(feeChanges = feeChanges -- remoteNodeIds)
+ def cleanup(now: TimestampMilli, maxAge: FiniteDuration): DecisionHistory = copy(funding = funding.filter { case (_, r) => now - r.timestamp < maxAge })
+ // @formatter:on
+ }
+
+ private def rollingDailyStats(p: PeerInfo): Seq[Seq[PeerStats]] = {
+ (0 until (p.stats.size - Bucket.bucketsPerDay)).map(i => p.stats.slice(i, i + Bucket.bucketsPerDay))
+ }
+
+ private def bestDailyVolumeOut(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.totalAmountOut).sum).max
+
+ private def bestDailyProfit(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.profit).sum).max
+
+ private def bestDailyOutgoingFlow(p: PeerInfo): MilliSatoshi = rollingDailyStats(p).map(s => s.map(_.outgoingFlow).sum).max
+
+ def apply(nodeParams: NodeParams, wallet: OnChainBalanceChecker, statsTracker: ActorRef[PeerStatsTracker.GetLatestStats], register: UntypedActorRef): Behavior[Command] = {
+ Behaviors.setup { context =>
+ Behaviors.withTimers { timers =>
+ timers.startTimerWithFixedDelay(ScorePeers(None), nodeParams.peerScoringConfig.scoringFrequency)
+ new PeerScorer(nodeParams, wallet, statsTracker, register, context).run(DecisionHistory(Map.empty, Map.empty))
+ }
+ }
+ }
+
+}
+
+private class PeerScorer(nodeParams: NodeParams, wallet: OnChainBalanceChecker, statsTracker: ActorRef[PeerStatsTracker.GetLatestStats], register: UntypedActorRef, context: ActorContext[PeerScorer.Command]) {
+
+ import PeerScorer._
+
+ implicit val ec: ExecutionContext = context.system.executionContext
+ private val log = context.log
+ private val config = nodeParams.peerScoringConfig
+
+ private def run(history: DecisionHistory): Behavior[Command] = {
+ Behaviors.receiveMessagePartial {
+ case ScorePeers(replyTo_opt) =>
+ statsTracker ! PeerStatsTracker.GetLatestStats(context.messageAdapter[PeerStatsTracker.LatestStats](e => WrappedLatestStats(e.peers)))
+ waitForStats(replyTo_opt, history)
+ }
+ }
+
+ private def waitForStats(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], history: DecisionHistory): Behavior[Command] = {
+ Behaviors.receiveMessagePartial {
+ case WrappedLatestStats(peers) => scorePeers(replyTo_opt, peers, history)
+ }
+ }
+
+ private def scorePeers(replyTo_opt: Option[ActorRef[Seq[PeerInfo]]], peers: Seq[PeerInfo], history: DecisionHistory): Behavior[Command] = {
+ log.info("scoring {} peers", peers.size)
+ val dailyProfit = peers.map(_.stats.take(Bucket.bucketsPerDay).map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc
+ val weeklyProfit = peers.map(_.stats.map(_.profit).sum).sum.truncateToSatoshi.toMilliBtc
+ log.info("rolling daily profit = {} and weekly profit = {}", dailyProfit, weeklyProfit)
+
+ // We select peers that have the largest outgoing payment volume of the past day.
+ // This biases towards nodes that already have a large capacity and have liquidity available.
+ val bestPeersByVolume = peers
+ .map(p => (p, p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum))
+ .filter(_._2 > 0.msat)
+ .sortBy(_._2)(Ordering[MilliSatoshi].reverse)
+ .take(config.topPeersCount)
+ .map(_._1)
+ log.debug("top {} peers:", bestPeersByVolume.size)
+ printDailyStats(bestPeersByVolume)
+
+ // We select peers that have a low outgoing balance but were previously good peers, by looking at the largest daily
+ // outgoing payment volume of the past week, using a rolling daily window. This also biases towards nodes that
+ // already have a large capacity, but lets us identify good peers that ran out of liquidity.
+ val bestPeersWithoutLiquidityByVolume = peers
+ .filter(p => p.canSend <= p.capacity * 0.1)
+ .map(p => (p, bestDailyVolumeOut(p)))
+ .sortBy(_._2)(Ordering[MilliSatoshi].reverse)
+ .take(config.topPeersCount)
+ .map(_._1)
+
+ // We identify peers that need additional liquidity.
+ val bestPeersThatNeedLiquidity = bestPeersByVolume
+ // We select peers that have a large outgoing flow, which means they will run out of liquidity in the next days.
+ .filter(p => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2)
+ // And peers that have already ran out of liquidity.
+ .appendedAll(bestPeersWithoutLiquidityByVolume)
+ .distinctBy(_.remoteNodeId)
+ // We order them by their best daily profit of the past week.
+ .map(p => (p, bestDailyProfit(p)))
+ .sortBy(_._2)(Ordering[MilliSatoshi].reverse)
+ .take(config.topPeersCount)
+ // We fund a multiple of our daily outgoing flow, to ensure we have enough liquidity for a few days.
+ .map { case (p, _) => FundingProposal(p, (bestDailyOutgoingFlow(p) * 4).truncateToSatoshi) }
+ .filter(_.fundingAmount > 0.sat)
+ if (bestPeersThatNeedLiquidity.nonEmpty) {
+ log.debug("top {} peers that need liquidity:", bestPeersThatNeedLiquidity.size)
+ printDailyStats(bestPeersThatNeedLiquidity.map(_.peer))
+ }
+
+ // We select peers that are performing well relative to their capacity, which lets us identify good peers that may
+ // have a smaller capacity and may deserve a capacity increase.
+ val goodSmallPeers = peers
+ .map(p => (p, bestDailyVolumeOut(p).truncateToSatoshi.toLong.toDouble / p.capacity.toLong))
+ .filter(_._2 > 0)
+ .sortBy(_._2)(Ordering[Double].reverse)
+ .take(config.topPeersCount)
+ // We only keep peers that may run out of liquidity in the next days.
+ .filter { case (p, _) => p.stats.take(Bucket.bucketsPerDay).map(_.outgoingFlow).sum >= p.canSend * 0.2 }
+ .map(_._1)
+ // We'd like to increase their capacity by 50%.
+ .map(p => FundingProposal(p, p.capacity * 0.5))
+ if (goodSmallPeers.nonEmpty) {
+ log.debug("we've identified {} smaller peers that perform well relative to their capacity", goodSmallPeers.size)
+ printDailyStats(goodSmallPeers.map(_.peer))
+ }
+
+ // We try to identify peers that may have ran out of liquidity a long time ago but are interesting for us.
+ val peersToRevive = peers
+ // We prioritize peers that the node operator whitelisted.
+ .filter(p => config.topPeersWhitelist.contains(p.remoteNodeId))
+ .sortBy(_.capacity)(Ordering[Satoshi].reverse)
+ // And we add peers with a large capacity that have a low balance.
+ .appendedAll(peers
+ .sortBy(_.capacity)(Ordering[Satoshi].reverse)
+ .take(config.topPeersCount)
+ .filter(p => p.canSend <= p.capacity * 0.1)
+ )
+ .distinctBy(_.remoteNodeId)
+ // We'd like to increase their capacity by 25%.
+ .map(p => FundingProposal(p, p.capacity * 0.25))
+
+ // Since we're not yet reading past events from the DB, we need to wait until we have collected enough data before
+ // taking some actions such as opening or closing channels or updating relay fees.
+ // TODO: remove this once we start reading past data from the AuditDb on restart.
+ val hasPastData = bestPeersByVolume.exists(_.stats.drop(Bucket.bucketsPerDay).exists(_ != PeerStats.empty))
+ if (hasPastData && replyTo_opt.isEmpty) {
+ closeChannelsIfNeeded(peers)
+ val history1 = updateRelayFeesIfNeeded(peers, history)
+ fundChannelsIfNeeded(bestPeersThatNeedLiquidity, goodSmallPeers, peersToRevive, history1)
+ } else {
+ replyTo_opt.foreach(_ ! bestPeersThatNeedLiquidity.map(_.peer))
+ run(history)
+ }
+ }
+
+ private def printDailyStats(peers: Seq[PeerInfo]): Unit = {
+ log.debug("| rank | node_id | daily_volume | daily_profit | can_send | can_receive |")
+ log.debug("|------|--------------------------------------------------------------------|---------------|---------------|---------------|---------------|")
+ peers.zipWithIndex.foreach { case (p, i) =>
+ val dailyStats = p.stats.take(Bucket.bucketsPerDay)
+ val dailyVolume = dailyStats.map(_.totalAmountOut).sum.truncateToSatoshi.toMilliBtc
+ val dailyProfit = dailyStats.map(_.profit).sum.truncateToSatoshi.toMilliBtc
+ val canSend = p.canSend.truncateToSatoshi.toMilliBtc
+ val canReceive = p.canReceive.truncateToSatoshi.toMilliBtc
+ log.debug(f"| ${i + 1}%4d | ${p.remoteNodeId} | ${dailyVolume.toDouble}%8.2f mbtc | ${dailyProfit.toDouble}%8.2f mbtc | ${canSend.toDouble}%8.2f mbtc | ${canReceive.toDouble}%8.2f mbtc |")
+ }
+ log.debug("|------|--------------------------------------------------------------------|---------------|---------------|---------------|---------------|")
+ }
+
+ private def closeChannelsIfNeeded(peers: Seq[PeerInfo]): Unit = {
+ peers
+ // We only close channels when we have more than one.
+ .filter(_.channels.size > 1)
+ // We only close channels for which most of the liquidity is idle on our side.
+ .filter(p => p.canSend >= p.capacity * 0.8 && p.stats.map(_.totalAmountOut).sum <= p.capacity * 0.05)
+ .foreach(p => {
+ val channels = p.channels.sortWith {
+ // We want to keep a public channel over a private channel.
+ case (c1, c2) if c1.isPublic != c2.isPublic => c1.isPublic
+ // Otherwise, we keep the channel with the largest capacity.
+ case (c1, c2) if c1.capacity != c2.capacity => c1.capacity >= c2.capacity
+ // Otherwise, we keep the channel with the smallest balance (and thus highest inbound liquidity).
+ case (c1, c2) => c1.canSend <= c2.canSend
+ }
+ // We keep the best channel and close the others.
+ channels.tail.foreach { c =>
+ log.debug("we should close channel_id={} with remote_node_id={} (local={}, remote={})", c.channelId, p.remoteNodeId, c.canSend.truncateToSatoshi.toMilliBtc, c.canReceive.truncateToSatoshi.toMilliBtc)
+ if (config.liquidity.autoClose && nodeParams.currentFeeratesForFundingClosing.medium <= config.liquidity.maxFeerate) {
+ log.info("closing channel_id={} with remote_node_id={} (local={}, remote={})", c.channelId, p.remoteNodeId, c.canSend.truncateToSatoshi.toMilliBtc, c.canReceive.truncateToSatoshi.toMilliBtc)
+ val cmd = CMD_CLOSE(UntypedActorRef.noSender, None, None)
+ register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd)
+ }
+ }
+ })
+ }
+
+ private def updateRelayFeesIfNeeded(peers: Seq[PeerInfo], history: DecisionHistory): DecisionHistory = {
+ // We configure *daily* absolute and proportional payment volume targets. We look at events from the current period
+ // and the previous period, so we need to get the right ratio to convert those daily amounts.
+ val now = TimestampMilli.now()
+ val lastTwoBucketsRatio = 1.0 + Bucket.consumed(now)
+ val lastTwoBucketsDailyRatio = (Bucket.duration * lastTwoBucketsRatio).toSeconds.toDouble / (24 * 3600)
+ // We increase fees of channels that are performing better than we expected.
+ val feeIncreases = peers
+ // We select peers that have exceeded our payment volume target in the past two periods.
+ .filter(p => p.stats.take(2).map(_.totalAmountOut).sum >= Seq(config.relayFees.dailyPaymentVolumeThreshold * lastTwoBucketsDailyRatio, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent * lastTwoBucketsDailyRatio).min)
+ // And that have an increasing payment volume compared to the period before that.
+ .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat)
+ .filter(p => (p.stats.take(2).map(_.totalAmountOut).sum / lastTwoBucketsRatio) > p.stats.slice(2, 3).map(_.totalAmountOut).sum * 1.1)
+ .flatMap(p => {
+ p.latestUpdate_opt match {
+ // And for which we haven't updated our relay fees recently already.
+ case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds =>
+ val next = u.relayFees.copy(feeProportionalMillionths = u.feeProportionalMillionths + 500)
+ if (next.feeBase <= config.relayFees.maxRelayFees.feeBase && next.feeProportionalMillionths <= config.relayFees.maxRelayFees.feeProportionalMillionths) {
+ val dailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum
+ Some(p.remoteNodeId -> FeeChangeDecision(FeeIncrease, u.relayFees, next, dailyVolume))
+ } else {
+ None
+ }
+ case _ => None
+ }
+ }).toMap
+ // We decrease fees of channels that aren't performing well.
+ val feeDecreases = peers
+ // We select peers that have a recent 15% decrease in outgoing payment volume.
+ .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum > 0.msat)
+ .filter(p => p.stats.take(2).map(_.totalAmountOut).sum <= p.stats.slice(2, 3).map(_.totalAmountOut).sum * 0.85)
+ // For which the volume was previously already stable or decreasing.
+ .filter(p => p.stats.slice(2, 3).map(_.totalAmountOut).sum <= p.stats.slice(3, 4).map(_.totalAmountOut).sum)
+ // And that have enough liquidity to relay outgoing payments.
+ .filter(p => p.canSend >= Seq(config.relayFees.dailyPaymentVolumeThreshold, p.capacity * config.relayFees.dailyPaymentVolumeThresholdPercent).min)
+ .flatMap(p => {
+ p.latestUpdate_opt match {
+ // And for which we haven't updated our relay fees recently already.
+ case Some(u) if u.timestamp <= now.toTimestampSecond - (Bucket.duration * 1.5).toSeconds =>
+ val next = u.relayFees.copy(feeProportionalMillionths = u.feeProportionalMillionths - 500)
+ if (next.feeBase >= config.relayFees.minRelayFees.feeBase && next.feeProportionalMillionths >= config.relayFees.minRelayFees.feeProportionalMillionths) {
+ val dailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum
+ Some(p.remoteNodeId -> FeeChangeDecision(FeeDecrease, u.relayFees, next, dailyVolume))
+ } else {
+ None
+ }
+ case _ => None
+ }
+ }).toMap
+ // We revert fee changes that had a negative impact on volume.
+ val feeReverts = peers
+ .filterNot(p => feeIncreases.contains(p.remoteNodeId) || feeDecreases.contains(p.remoteNodeId))
+ .flatMap { p =>
+ (history.feeChanges.get(p.remoteNodeId), p.latestUpdate_opt) match {
+ case (Some(record), Some(u)) =>
+ // Confirm our change is still in effect.
+ val updateMatchesRecord = u.feeProportionalMillionths == record.newFee.feeProportionalMillionths
+ // We expect around 6h-24h after the update to really see its effect on path-finding: after 24h, too many
+ // other parameters may interfere with our evaluation.
+ val withinEvaluationWindow = u.timestamp >= now.toTimestampSecond - (24 hours).toSeconds
+ // We don't want to update fees too often.
+ val notUpdatedRecently = u.timestamp <= now.toTimestampSecond - (Bucket.duration * 2).toSeconds
+ // We revert both fee increases and fee decreases: if volume didn't increase after a fee decrease, we'd
+ // rather collect our previous (bigger) relay fee.
+ val currentDailyVolume = p.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum
+ val shouldRevert = record.direction match {
+ case FeeIncrease => currentDailyVolume < record.dailyVolumeOutAtChange * 0.8
+ case FeeDecrease => currentDailyVolume < record.dailyVolumeOutAtChange * 0.9
+ }
+ if (updateMatchesRecord && withinEvaluationWindow && notUpdatedRecently && shouldRevert) {
+ val decision = record.direction match {
+ case PeerScorer.FeeIncrease => FeeChangeDecision(FeeDecrease, u.relayFees, u.relayFees.copy(feeProportionalMillionths = u.relayFees.feeProportionalMillionths - 500), currentDailyVolume)
+ case PeerScorer.FeeDecrease => FeeChangeDecision(FeeIncrease, u.relayFees, u.relayFees.copy(feeProportionalMillionths = u.relayFees.feeProportionalMillionths + 500), currentDailyVolume)
+ }
+ Some(p.remoteNodeId -> decision)
+ } else {
+ None
+ }
+ case _ => None
+ }
+ }.toMap
+ // We print the results to help debugging.
+ if (feeIncreases.nonEmpty || feeDecreases.nonEmpty || feeReverts.nonEmpty) {
+ log.debug("we should update our relay fees with the following peers:")
+ log.debug("| node_id | volume_variation | decision | current_fee | next_fee |")
+ log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|")
+ (feeIncreases.toSeq ++ feeDecreases.toSeq ++ feeReverts.toSeq).foreach { case (remoteNodeId, decision) =>
+ val volumeVariation = peers.find(_.remoteNodeId == remoteNodeId) match {
+ case Some(p) if p.stats.slice(2, 3).map(_.totalAmountOut).sum != 0.msat => p.stats.take(2).map(_.totalAmountOut).sum.toLong.toDouble / (p.stats.slice(2, 3).map(_.totalAmountOut).sum.toLong * lastTwoBucketsRatio)
+ case _ => 0.0
+ }
+ log.debug(f"| $remoteNodeId | $volumeVariation%.2f | ${decision.direction} | ${decision.previousFee.feeProportionalMillionths}%11d | ${decision.newFee.feeProportionalMillionths}%8d |")
+ }
+ log.debug("|--------------------------------------------------------------------|-------------------|----------|-------------|----------|")
+ }
+ if (config.relayFees.autoUpdate) {
+ (feeIncreases.toSeq ++ feeDecreases.toSeq ++ feeReverts.toSeq).foreach { case (remoteNodeId, decision) =>
+ val cmd = CMD_UPDATE_RELAY_FEE(UntypedActorRef.noSender, decision.newFee.feeBase, decision.newFee.feeProportionalMillionths)
+ peers.find(_.remoteNodeId == remoteNodeId).map(_.channels).getOrElse(Nil).foreach(c => register ! Register.Forward(context.system.ignoreRef, c.channelId, cmd))
+ }
+ }
+ // Note that in order to avoid oscillating between reverts (reverting a revert), we remove the previous records when
+ // reverting a change: this way, the normal algorithm resumes during the next run.
+ history.addFeeChanges(feeIncreases ++ feeDecreases).revertFeeChanges(feeReverts.keySet)
+ }
+
+ private def fundChannelsIfNeeded(bestPeers: Seq[FundingProposal], smallPeers: Seq[FundingProposal], toRevive: Seq[FundingProposal], history: DecisionHistory): Behavior[Command] = {
+ // We don't want to fund peers every time our scoring algorithm runs, otherwise we may create too many on-chain
+ // transactions. We draw from a random distribution to ensure that on average:
+ // - we follow our rate-limit for funding best peers and fund at most 3 at a time
+ // - we fund a small peer once every 3 days, chosen at random between our top 3
+ // - we revive an older peer once every day, chosen at random between our top 3
+ val toFund = {
+ val scoringPerDay = (1 day).toSeconds / config.scoringFrequency.toSeconds
+ val bestPeersToFund = bestPeers.headOption match {
+ case Some(_) if Random.nextDouble() <= config.liquidity.maxFundingTxPerDay.toDouble / (scoringPerDay * bestPeers.size.min(3)) => bestPeers.take(3)
+ case _ => Nil
+ }
+ val smallPeerToFund_opt = smallPeers.headOption match {
+ case Some(_) if Random.nextDouble() <= (1.0 / (scoringPerDay * 3)) => Random.shuffle(smallPeers.take(3)).headOption
+ case _ => None
+ }
+ val toReviveNotAlreadySelected = toRevive.filterNot(p => bestPeers.exists(_.remoteNodeId == p.remoteNodeId) || smallPeerToFund_opt.exists(_.remoteNodeId == p.remoteNodeId))
+ val toRevive_opt = toReviveNotAlreadySelected.headOption match {
+ case Some(_) if Random.nextDouble() <= (1.0 / scoringPerDay) => Random.shuffle(toReviveNotAlreadySelected.take(3)).headOption
+ case _ => None
+ }
+ (bestPeersToFund ++ toRevive_opt ++ smallPeerToFund_opt).distinctBy(_.remoteNodeId)
+ }
+ if (bestPeers.isEmpty && smallPeers.isEmpty && toRevive.isEmpty) {
+ log.info("we haven't identified peers that require liquidity yet")
+ run(history)
+ } else if (toFund.isEmpty) {
+ log.info("we skip funding peers because of per-day rate-limits: increase eclair.peer-scoring.liquidity.max-funding-tx-per-day to fund more often")
+ run(history)
+ } else if (config.liquidity.maxFeerate < nodeParams.currentFeeratesForFundingClosing.medium) {
+ log.info("we skip funding peers because current feerate is too high ({} < {}): increase eclair.peer-scoring.liquidity.max-feerate-sat-per-byte to start funding again", config.liquidity.maxFeerate, nodeParams.currentFeeratesForFundingClosing.medium)
+ run(history)
+ } else {
+ context.pipeToSelf(wallet.onChainBalance()) {
+ case Success(b) => OnChainBalance(b.confirmed, b.unconfirmed)
+ case Failure(e) => WalletError(e)
+ }
+ Behaviors.receiveMessagePartial {
+ case WalletError(e) =>
+ log.warn("cannot get on-chain balance: {}", e.getMessage)
+ run(history)
+ case OnChainBalance(confirmed, unconfirmed) =>
+ val now = TimestampMilli.now()
+ val history1 = if (confirmed <= config.liquidity.minOnChainBalance) {
+ log.info("we don't have enough on-chain balance to fund new channels (confirmed={}, unconfirmed={})", confirmed.toMilliBtc, unconfirmed.toMilliBtc)
+ history
+ } else {
+ toFund
+ // We don't fund peers that are already being funded.
+ .filterNot(p => p.peer.hasPendingChannel)
+ // We don't fund peers that were recently funded, unless volume improved since the last funding.
+ .filter { f =>
+ history.funding.get(f.remoteNodeId) match {
+ case Some(record) if now - record.timestamp < config.liquidity.fundingCooldown =>
+ val currentDailyVolume = f.peer.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum
+ if (currentDailyVolume <= record.dailyVolumeOutAtFunding * 1.1) {
+ log.info("skipping funding for remote_node_id={}: last funded {} ago, volume has not improved (was={}, now={})", f.remoteNodeId, now - record.timestamp, record.dailyVolumeOutAtFunding.truncateToSatoshi.toMilliBtc, currentDailyVolume.truncateToSatoshi.toMilliBtc)
+ false
+ } else {
+ log.info("re-funding remote_node_id={}: volume improved since last funding (was={}, now={})", f.remoteNodeId, record.dailyVolumeOutAtFunding.truncateToSatoshi.toMilliBtc, currentDailyVolume.truncateToSatoshi.toMilliBtc)
+ true
+ }
+ case _ => true
+ }
+ }
+ // And we apply our configured funding limits to the liquidity suggestions.
+ .map(f => f.copy(fundingAmount = f.fundingAmount.max(config.liquidity.minFundingAmount).min(config.liquidity.maxFundingAmount)))
+ .foldLeft((confirmed - config.liquidity.minOnChainBalance, history)) {
+ case ((available, history), f) if available < f.fundingAmount * 0.5 => (available, history)
+ case ((available, history), f) =>
+ val fundingAmount = f.fundingAmount.min(available)
+ log.info("funding channel with remote_node_id={} (funding_amount={})", f.remoteNodeId, fundingAmount.toMilliBtc)
+ // TODO: when do we want to create a private channel? Maybe if we already have a bigger public channel?
+ val channelFlags = ChannelFlags(announceChannel = true)
+ val cmd = OpenChannel(f.remoteNodeId, fundingAmount, None, None, None, None, None, Some(channelFlags), Some(Timeout(60 seconds)))
+ register ! Register.ForwardNodeId(context.system.ignoreRef, f.remoteNodeId, cmd)
+ val dailyVolume = f.peer.stats.take(Bucket.bucketsPerDay).map(_.totalAmountOut).sum
+ (available - fundingAmount, history.addFunding(f.remoteNodeId, FundingDecision(fundingAmount, dailyVolume, now)))
+ }._2
+ }
+ run(history1.cleanup(now, 7 days))
+ }
+ }
+ }
+
+}
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala
new file mode 100644
index 0000000000..e44a3c0959
--- /dev/null
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/profit/PeerStatsTracker.scala
@@ -0,0 +1,380 @@
+/*
+ * Copyright 2026 ACINQ SAS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package fr.acinq.eclair.profit
+
+import akka.actor.typed.eventstream.EventStream
+import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
+import akka.actor.typed.{ActorRef, Behavior}
+import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
+import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong}
+import fr.acinq.eclair.channel._
+import fr.acinq.eclair.db.AuditDb
+import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
+import fr.acinq.eclair.wire.protocol.ChannelUpdate
+import fr.acinq.eclair.{Features, MilliSatoshi, MilliSatoshiLong, TimestampMilli, ToMilliSatoshiConversion}
+
+import java.time.{Instant, ZoneId, ZonedDateTime}
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+
+/**
+ * Created by t-bast on 30/01/2026.
+ */
+
+object PeerStatsTracker {
+ // @formatter:off
+ sealed trait Command
+ case class GetLatestStats(replyTo: ActorRef[LatestStats]) extends Command
+ private[profit] case object RemoveOldBuckets extends Command
+ private[profit] case class WrappedPaymentSent(e: PaymentSent) extends Command
+ private[profit] case class WrappedPaymentRelayed(e: PaymentRelayed) extends Command
+ private[profit] case class WrappedPaymentReceived(e: PaymentReceived) extends Command
+ private[profit] case class ChannelCreationInProgress(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command
+ private[profit] case class ChannelCreationAborted(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command
+ private[profit] case class WrappedLocalChannelUpdate(e: LocalChannelUpdate) extends Command
+ private[profit] case class WrappedLocalChannelDown(e: LocalChannelDown) extends Command
+ private[profit] case class WrappedAvailableBalanceChanged(e: AvailableBalanceChanged) extends Command
+ // @formatter:on
+
+ def apply(db: AuditDb, channels: Seq[PersistentChannelData]): Behavior[Command] = {
+ Behaviors.setup { context =>
+ Behaviors.withTimers { timers =>
+ new PeerStatsTracker(db, timers, context).start(channels)
+ }
+ }
+ }
+
+ /** Returns the latest statistics for all of our public peers. */
+ case class LatestStats(peers: Seq[PeerInfo])
+
+ /** NB: stats are ordered, with the most recent events first. */
+ case class PeerInfo(remoteNodeId: PublicKey, stats: Seq[PeerStats], channels: Seq[ChannelInfo], latestUpdate_opt: Option[ChannelUpdate], hasPendingChannel: Boolean) {
+ val capacity: Satoshi = channels.map(_.capacity).sum
+ val canSend: MilliSatoshi = channels.map(_.canSend).sum
+ val canReceive: MilliSatoshi = channels.map(_.canReceive).sum
+ }
+
+ /** We compute peer statistics per buckets spanning a few hours. */
+ case class Bucket(private val day: Int, private val month: Int, private val year: Int, private val slot: Int) extends Ordered[Bucket] {
+ override def compare(that: Bucket): Int = that match {
+ case Bucket(_, _, y, _) if y != year => year - y
+ case Bucket(_, m, _, _) if m != month => month - m
+ case Bucket(d, _, _, _) if d != day => day - d
+ case Bucket(_, _, _, s) => slot - s
+ }
+
+ override def toString: String = f"$year-$month%02d-$day%02d-$slot"
+ }
+
+ object Bucket {
+ val duration: FiniteDuration = 3 hours
+ val bucketsPerDay: Int = 8
+
+ def from(ts: TimestampMilli): Bucket = {
+ val date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts.toLong), ZoneId.of("UTC"))
+ Bucket(date.getDayOfMonth, date.getMonthValue, date.getYear, date.getHour * bucketsPerDay / 24)
+ }
+
+ /** Returns the percentage (between 0.00 and 1.00) of the bucket consumed at the given timestamp. */
+ def consumed(ts: TimestampMilli): Double = {
+ val bucket = from(ts)
+ val start = ZonedDateTime.of(bucket.year, bucket.month, bucket.day, bucket.slot * 24 / bucketsPerDay, 0, 0, 0, ZoneId.of("UTC")).toEpochSecond
+ (ts.toTimestampSecond.toLong - start).toDouble / duration.toSeconds
+ }
+ }
+
+ case class PeerStats(totalAmountIn: MilliSatoshi, totalAmountOut: MilliSatoshi, relayFeeEarned: MilliSatoshi, onChainFeePaid: Satoshi, liquidityFeeEarned: Satoshi, liquidityFeePaid: Satoshi) {
+ val outgoingFlow: MilliSatoshi = totalAmountOut - totalAmountIn
+ val profit: MilliSatoshi = relayFeeEarned + liquidityFeeEarned.toMilliSatoshi - onChainFeePaid.toMilliSatoshi - liquidityFeePaid.toMilliSatoshi
+ }
+
+ object PeerStats {
+ def empty: PeerStats = PeerStats(0 msat, 0 msat, 0 msat, 0 sat, 0 sat, 0 sat)
+ }
+
+ /**
+ * We aggregate events into buckets to avoid storing too much data per peer, while providing enough granularity to
+ * detect variations in volume and flows. Note that we store an entry for every peer with whom we have a channel,
+ * even for peers that don't have any activity (which lets us detect those peers and potentially reclaim liquidity).
+ */
+ case class BucketedPeerStats(private val stats: Map[PublicKey, Map[Bucket, PeerStats]]) {
+ def peers: Iterable[PublicKey] = stats.keys
+
+ /** Returns stats for the given peer in all buckets (most recent bucket first). */
+ def getPeerStats(remoteNodeId: PublicKey, now: TimestampMilli): Seq[PeerStats] = {
+ stats.get(remoteNodeId) match {
+ case Some(_) => (0 until BucketedPeerStats.bucketsCount).map(b => getPeerStatsForBucket(remoteNodeId, Bucket.from(now - b * Bucket.duration)))
+ case None => Seq.fill(BucketedPeerStats.bucketsCount)(PeerStats.empty)
+ }
+ }
+
+ /** Returns stats for the given bucket. */
+ private def getPeerStatsForBucket(remoteNodeId: PublicKey, bucket: Bucket): PeerStats = {
+ stats.get(remoteNodeId).flatMap(_.get(bucket)).getOrElse(PeerStats.empty)
+ }
+
+ /**
+ * When our first channel with a peer is created, we want to add an entry for it, even though there are no payments yet.
+ * This lets us detect new peers that are idle, which can be useful in many scenarios.
+ */
+ def initializePeerIfNeeded(remoteNodeId: PublicKey): BucketedPeerStats = {
+ stats.get(remoteNodeId) match {
+ case Some(_) => this // already initialized
+ case None => this.copy(stats = stats + (remoteNodeId -> Map.empty))
+ }
+ }
+
+ private def addOrUpdate(remoteNodeId: PublicKey, bucket: Bucket, peerStats: PeerStats): BucketedPeerStats = {
+ val buckets = stats.getOrElse(remoteNodeId, Map.empty[Bucket, PeerStats])
+ copy(stats = stats + (remoteNodeId -> (buckets + (bucket -> peerStats))))
+ }
+
+ def addPaymentSent(e: PaymentSent): BucketedPeerStats = {
+ e.parts.foldLeft(this) {
+ case (current, p) =>
+ val bucket = Bucket.from(p.settledAt)
+ val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket)
+ val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + p.amountWithFees)
+ current.addOrUpdate(p.remoteNodeId, bucket, peerStats1)
+ }
+ }
+
+ def addPaymentReceived(e: PaymentReceived): BucketedPeerStats = {
+ e.parts.foldLeft(this) {
+ case (current, p) =>
+ val bucket = Bucket.from(p.receivedAt)
+ val peerStats = current.getPeerStatsForBucket(p.remoteNodeId, bucket)
+ val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + p.amount)
+ current.addOrUpdate(p.remoteNodeId, bucket, peerStats1)
+ }
+ }
+
+ def addPaymentRelayed(e: PaymentRelayed): BucketedPeerStats = {
+ val withIncoming = e.incoming.foldLeft(this) {
+ case (current, i) =>
+ val bucket = Bucket.from(i.receivedAt)
+ val peerStats = current.getPeerStatsForBucket(i.remoteNodeId, bucket)
+ val peerStats1 = peerStats.copy(totalAmountIn = peerStats.totalAmountIn + i.amount)
+ current.addOrUpdate(i.remoteNodeId, bucket, peerStats1)
+ }
+ e.outgoing.foldLeft(withIncoming) {
+ case (current, o) =>
+ val bucket = Bucket.from(o.settledAt)
+ val peerStats = current.getPeerStatsForBucket(o.remoteNodeId, bucket)
+ // When using MPP and trampoline, payments can be relayed through multiple nodes at once.
+ // We split the fee according to the proportional amount relayed through the requested node.
+ val relayFee = e.relayFee * (o.amount.toLong.toDouble / e.amountOut.toLong)
+ val peerStats1 = peerStats.copy(totalAmountOut = peerStats.totalAmountOut + o.amount, relayFeeEarned = peerStats.relayFeeEarned + relayFee)
+ current.addOrUpdate(o.remoteNodeId, bucket, peerStats1)
+ }
+ }
+
+ /** Remove old buckets that exceed our retention window. This should be called frequently to avoid memory leaks. */
+ def removeOldBuckets(now: TimestampMilli): BucketedPeerStats = {
+ val oldestBucket = Bucket.from(now - Bucket.duration * BucketedPeerStats.bucketsCount)
+ copy(stats = stats.map {
+ case (remoteNodeId, peerStats) => remoteNodeId -> peerStats.filter { case (bucket, _) => bucket >= oldestBucket }
+ })
+ }
+
+ /** Remove a peer from our list: this should only happen when we don't have channels with that peer anymore. */
+ def removePeer(remoteNodeId: PublicKey): BucketedPeerStats = copy(stats = stats - remoteNodeId)
+ }
+
+ object BucketedPeerStats {
+ // We keep 7 days of past history.
+ val bucketsCount: Int = 7 * Bucket.bucketsPerDay
+
+ def empty(peers: Set[PublicKey]): BucketedPeerStats = BucketedPeerStats(peers.map(remoteNodeId => remoteNodeId -> Map.empty[Bucket, PeerStats]).toMap)
+ }
+
+ /** We keep minimal information about open channels to allow running our heuristics. */
+ case class ChannelInfo(channelId: ByteVector32, capacity: Satoshi, canSend: MilliSatoshi, canReceive: MilliSatoshi, isPublic: Boolean)
+
+ object ChannelInfo {
+ def apply(commitments: Commitments): ChannelInfo = ChannelInfo(commitments.channelId, commitments.latest.capacity, commitments.availableBalanceForSend, commitments.availableBalanceForReceive, commitments.announceChannel)
+ }
+
+ /**
+ * Note that we keep channel updates separately: we're only interested in the relay fees, which are the same for every channel.
+ * We also keep track of pending channels (channels being created), to avoid creating many channels at the same time.
+ */
+ case class PeerChannels(private val channels: Map[PublicKey, Seq[ChannelInfo]], private val updates: Map[PublicKey, ChannelUpdate], private val pending: Map[PublicKey, Set[ByteVector32]]) {
+ def peers: Set[PublicKey] = channels.keySet
+
+ def getChannels(remoteNodeId: PublicKey): Seq[ChannelInfo] = channels.getOrElse(remoteNodeId, Nil)
+
+ def getUpdate(remoteNodeId: PublicKey): Option[ChannelUpdate] = updates.get(remoteNodeId)
+
+ def hasChannels(remoteNodeId: PublicKey): Boolean = channels.contains(remoteNodeId)
+
+ def hasPendingChannel(remoteNodeId: PublicKey): Boolean = pending.contains(remoteNodeId)
+
+ def updateChannel(e: LocalChannelUpdate): PeerChannels = {
+ // Note that creating our channel update implicitly means that the channel isn't pending anymore.
+ updates.get(e.remoteNodeId) match {
+ case Some(u) if u.timestamp > e.channelUpdate.timestamp => updateChannel(e.commitments).removePendingChannel(e.remoteNodeId, e.channelId)
+ case _ => updateChannel(e.commitments).copy(updates = updates + (e.remoteNodeId -> e.channelUpdate)).removePendingChannel(e.remoteNodeId, e.channelId)
+ }
+ }
+
+ def updateChannel(e: AvailableBalanceChanged): PeerChannels = {
+ updateChannel(e.commitments)
+ }
+
+ private def updateChannel(commitments: Commitments): PeerChannels = {
+ if (!commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve)) {
+ val peerChannels1 = channels.getOrElse(commitments.remoteNodeId, Nil).filter(_.channelId != commitments.channelId) :+ ChannelInfo(commitments)
+ copy(channels = channels + (commitments.remoteNodeId -> peerChannels1))
+ } else {
+ // We filter out channels with mobile wallets.
+ copy(channels = channels - commitments.remoteNodeId)
+ }
+ }
+
+ def addPendingChannel(e: ChannelCreationInProgress): PeerChannels = {
+ val pending1 = pending + (e.remoteNodeId -> (pending.getOrElse(e.remoteNodeId, Set.empty) + e.channelId))
+ copy(pending = pending1)
+ }
+
+ def removeChannel(e: LocalChannelDown): PeerChannels = {
+ val updated = channels.get(e.remoteNodeId) match {
+ case Some(peerChannels) =>
+ val peerChannels1 = peerChannels.filter(_.channelId != e.channelId)
+ if (peerChannels1.isEmpty) {
+ copy(channels = channels - e.remoteNodeId, updates = updates - e.remoteNodeId)
+ } else {
+ copy(channels = channels + (e.remoteNodeId -> peerChannels1))
+ }
+ case None => this
+ }
+ updated.removePendingChannel(e.remoteNodeId, e.channelId)
+ }
+
+ private def removePendingChannel(remoteNodeId: PublicKey, channelId: ByteVector32): PeerChannels = {
+ val pending1 = pending.get(remoteNodeId) match {
+ case Some(channels) =>
+ val channels1 = channels - channelId
+ if (channels1.isEmpty) {
+ pending - remoteNodeId
+ } else {
+ pending + (remoteNodeId -> channels1)
+ }
+ case None => pending
+ }
+ copy(pending = pending1)
+ }
+
+ def removeChannel(e: ChannelCreationAborted): PeerChannels = {
+ removePendingChannel(e.remoteNodeId, e.channelId)
+ }
+ }
+
+ private object PeerChannels {
+ def apply(channels: Seq[PersistentChannelData]): PeerChannels = {
+ channels.foldLeft(PeerChannels(Map.empty[PublicKey, Seq[ChannelInfo]], Map.empty[PublicKey, ChannelUpdate], Map.empty[PublicKey, Set[ByteVector32]])) {
+ case (current, channel) => channel match {
+ // We include pending channels.
+ case _: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED | _: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_DUAL_FUNDING_READY | _: DATA_WAIT_FOR_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_CHANNEL_READY if !channel.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) =>
+ val pending1 = current.pending.getOrElse(channel.remoteNodeId, Set.empty) + channel.channelId
+ current.copy(pending = current.pending + (channel.remoteNodeId -> pending1))
+ // We filter out channels with mobile wallets.
+ case d: DATA_NORMAL if !d.commitments.channelParams.channelFeatures.hasFeature(Features.PhoenixZeroReserve) =>
+ val peerChannels1 = current.channels.getOrElse(d.remoteNodeId, Nil) :+ ChannelInfo(d.commitments)
+ val update1 = current.updates.get(d.remoteNodeId) match {
+ case Some(update) if update.timestamp > d.channelUpdate.timestamp => update
+ case _ => d.channelUpdate
+ }
+ current.copy(
+ channels = current.channels + (d.remoteNodeId -> peerChannels1),
+ updates = current.updates + (d.remoteNodeId -> update1)
+ )
+ case _ => current
+ }
+ }
+ }
+ }
+
+}
+
+private class PeerStatsTracker(db: AuditDb, timers: TimerScheduler[PeerStatsTracker.Command], context: ActorContext[PeerStatsTracker.Command]) {
+
+ import PeerStatsTracker._
+
+ private val log = context.log
+
+ private def start(channels: Seq[PersistentChannelData]): Behavior[Command] = {
+ // We subscribe to channel events to update channel balances.
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelIdAssigned](e => ChannelCreationInProgress(e.remoteNodeId, e.channelId)))
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelAborted](e => ChannelCreationAborted(e.remoteNodeId, e.channelId)))
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelDown))
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedLocalChannelUpdate))
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedAvailableBalanceChanged))
+ val peerChannels = PeerChannels(channels)
+ log.info("gathering statistics for {} peers", peerChannels.peers.size)
+ // We subscribe to payment events to update statistics.
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentSent))
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentRelayed))
+ context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedPaymentReceived))
+ // TODO: read events that happened before startedAt from the DB to initialize statistics from past data.
+ val stats = BucketedPeerStats.empty(peerChannels.peers)
+ timers.startTimerWithFixedDelay(RemoveOldBuckets, Bucket.duration)
+ listening(stats, peerChannels)
+ }
+
+ private def listening(stats: BucketedPeerStats, channels: PeerChannels): Behavior[Command] = {
+ Behaviors.receiveMessage {
+ case WrappedPaymentSent(e) =>
+ listening(stats.addPaymentSent(e), channels)
+ case WrappedPaymentReceived(e) =>
+ listening(stats.addPaymentReceived(e), channels)
+ case WrappedPaymentRelayed(e) =>
+ listening(stats.addPaymentRelayed(e), channels)
+ case e: ChannelCreationInProgress =>
+ listening(stats, channels.addPendingChannel(e))
+ case e: ChannelCreationAborted =>
+ listening(stats, channels.removeChannel(e))
+ case WrappedLocalChannelUpdate(e) =>
+ listening(stats.initializePeerIfNeeded(e.remoteNodeId), channels.updateChannel(e))
+ case WrappedAvailableBalanceChanged(e) =>
+ listening(stats, channels.updateChannel(e))
+ case WrappedLocalChannelDown(e) =>
+ val channels1 = channels.removeChannel(e)
+ val stats1 = if (channels1.getChannels(e.remoteNodeId).isEmpty && !channels1.hasPendingChannel(e.remoteNodeId)) {
+ stats.removePeer(e.remoteNodeId)
+ } else {
+ stats
+ }
+ listening(stats1, channels1)
+ case RemoveOldBuckets =>
+ listening(stats.removeOldBuckets(TimestampMilli.now()), channels)
+ case GetLatestStats(replyTo) =>
+ // TODO: do a db.listConfirmed() to update on-chain stats (we cannot rely on events only because data comes from
+ // the TransactionPublished event, but should only be applied after TransactionConfirmed so we need permanent
+ // storage). We'll need the listConfirmed() function added in https://github.com/ACINQ/eclair/pull/3245.
+ log.debug("statistics available for {} peers", stats.peers.size)
+ val now = TimestampMilli.now()
+ val latest = stats.peers
+ // We only return statistics for peers with whom we have channels available for payments.
+ .filter(nodeId => channels.hasChannels(nodeId))
+ .map(nodeId => PeerInfo(nodeId, stats.getPeerStats(nodeId, now), channels.getChannels(nodeId), channels.getUpdate(nodeId), channels.hasPendingChannel(nodeId)))
+ .toSeq
+ replyTo ! LatestStats(latest)
+ listening(stats, channels)
+ }
+ }
+
+}
diff --git a/eclair-core/src/test/resources/logback-test.xml b/eclair-core/src/test/resources/logback-test.xml
index 236be1d4af..0b946139b1 100644
--- a/eclair-core/src/test/resources/logback-test.xml
+++ b/eclair-core/src/test/resources/logback-test.xml
@@ -55,6 +55,7 @@
+
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
index 3a43f342cb..226f28dedc 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
@@ -29,6 +29,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.offer.OffersConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
+import fr.acinq.eclair.profit.PeerScorer
import fr.acinq.eclair.reputation.Reputation
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, MessageWeightRatios}
import fr.acinq.eclair.router.Router._
@@ -261,6 +262,29 @@ object TestConstants {
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds),
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour),
+ peerScoringConfig = PeerScorer.Config(
+ enabled = true,
+ scoringFrequency = 1 day,
+ topPeersCount = 10,
+ topPeersWhitelist = Set.empty,
+ liquidity = PeerScorer.LiquidityConfig(
+ autoFund = true,
+ autoClose = true,
+ minFundingAmount = 1_000_000 sat, // 0.01 BTC
+ maxFundingAmount = 100_000_000 sat, // 1 BTC
+ minOnChainBalance = 5_000_000 sat, // 0.05 BTC
+ maxFeerate = FeeratePerByte(100 sat).perKw,
+ maxFundingTxPerDay = 100,
+ fundingCooldown = 72 hours,
+ ),
+ relayFees = PeerScorer.RelayFeesConfig(
+ autoUpdate = true,
+ minRelayFees = RelayFees(1 msat, 500),
+ maxRelayFees = RelayFees(10_000 msat, 5000),
+ dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC
+ dailyPaymentVolumeThresholdPercent = 0.1,
+ )
+ ),
offersConfig = OffersConfig(messagePathMinLength = 2, paymentPathCount = 2, paymentPathLength = 4, paymentPathCltvExpiryDelta = CltvExpiryDelta(500)),
)
@@ -453,6 +477,29 @@ object TestConstants {
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds),
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour),
+ peerScoringConfig = PeerScorer.Config(
+ enabled = true,
+ scoringFrequency = 1 day,
+ topPeersCount = 10,
+ topPeersWhitelist = Set.empty,
+ liquidity = PeerScorer.LiquidityConfig(
+ autoFund = true,
+ autoClose = true,
+ minFundingAmount = 1_000_000 sat, // 0.01 BTC
+ maxFundingAmount = 100_000_000 sat, // 1 BTC
+ minOnChainBalance = 5_000_000 sat, // 0.05 BTC
+ maxFeerate = FeeratePerByte(100 sat).perKw,
+ maxFundingTxPerDay = 100,
+ fundingCooldown = 72 hours,
+ ),
+ relayFees = PeerScorer.RelayFeesConfig(
+ autoUpdate = true,
+ minRelayFees = RelayFees(1 msat, 500),
+ maxRelayFees = RelayFees(10_000 msat, 5000),
+ dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC
+ dailyPaymentVolumeThresholdPercent = 0.1,
+ )
+ ),
offersConfig = OffersConfig(messagePathMinLength = 2, paymentPathCount = 2, paymentPathLength = 4, paymentPathCltvExpiryDelta = CltvExpiryDelta(500)),
)
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala
index 1ee99dbb26..0df41cf417 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/DummyOnChainWallet.scala
@@ -250,4 +250,13 @@ object DummyOnChainWallet {
MakeFundingTxResponse(fundingTx, 0, 420 sat)
}
+}
+
+class DummyBalanceChecker(var confirmedBalance: Satoshi = 0 sat, var unconfirmedBalance: Satoshi = 0 sat) extends OnChainBalanceChecker {
+ def setBalance(confirmed: Satoshi, unconfirmed: Satoshi): Unit = {
+ confirmedBalance = confirmed
+ unconfirmedBalance = unconfirmed
+ }
+
+ override def onChainBalance()(implicit ec: ExecutionContext): Future[OnChainBalance] = Future.successful(OnChainBalance(confirmedBalance, unconfirmedBalance))
}
\ No newline at end of file
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala
new file mode 100644
index 0000000000..d043c9b5c6
--- /dev/null
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerScorerSpec.scala
@@ -0,0 +1,759 @@
+package fr.acinq.eclair.profit
+
+import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
+import akka.actor.typed.ActorRef
+import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
+import com.typesafe.config.ConfigFactory
+import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
+import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, SatoshiLong}
+import fr.acinq.eclair.blockchain.DummyBalanceChecker
+import fr.acinq.eclair.blockchain.fee.FeeratePerByte
+import fr.acinq.eclair.channel.{CMD_CLOSE, CMD_UPDATE_RELAY_FEE, Register}
+import fr.acinq.eclair.io.Peer.OpenChannel
+import fr.acinq.eclair.payment.relay.Relayer.RelayFees
+import fr.acinq.eclair.profit.PeerScorer._
+import fr.acinq.eclair.profit.PeerStatsTracker._
+import fr.acinq.eclair.wire.protocol.ChannelUpdate
+import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags}
+import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, TestConstants, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32}
+import org.scalatest.Inside.inside
+import org.scalatest.funsuite.AnyFunSuiteLike
+import scodec.bits.HexStringSyntax
+
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
+class PeerScorerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike {
+
+ private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e")
+ private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28")
+ private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96")
+
+ private val weeklyBuckets = Bucket.bucketsPerDay * 7
+ private val defaultConfig = Config(
+ enabled = true,
+ // We set this to 1 day to more easily match daily rate-limits.
+ scoringFrequency = 1 day,
+ topPeersCount = 10,
+ topPeersWhitelist = Set.empty,
+ liquidity = PeerScorer.LiquidityConfig(
+ autoFund = true,
+ autoClose = true,
+ minFundingAmount = 1_000_000 sat, // 0.01 BTC
+ maxFundingAmount = 100_000_000 sat, // 1 BTC
+ minOnChainBalance = 5_000_000 sat, // 0.05 BTC
+ maxFeerate = FeeratePerByte(100 sat).perKw,
+ maxFundingTxPerDay = 100,
+ fundingCooldown = 72 hours,
+ ),
+ relayFees = PeerScorer.RelayFeesConfig(
+ autoUpdate = true,
+ minRelayFees = RelayFees(1 msat, 500),
+ maxRelayFees = RelayFees(10_000 msat, 5000),
+ dailyPaymentVolumeThreshold = 1_000_000 sat, // 0.01 BTC
+ dailyPaymentVolumeThresholdPercent = 0.1,
+ )
+ )
+
+ private case class Fixture(tracker: TestProbe[GetLatestStats], register: TestProbe[Any], wallet: DummyBalanceChecker, scorer: ActorRef[PeerScorer.Command])
+
+ private def withFixture(config: Config = defaultConfig, onChainBalance: BtcAmount = 0 sat)(testFun: Fixture => Any): Unit = {
+ val tracker = TestProbe[GetLatestStats]()
+ val register = TestProbe[Any]()
+ val wallet = new DummyBalanceChecker(confirmedBalance = onChainBalance.toMilliSatoshi.truncateToSatoshi)
+ val nodeParams = TestConstants.Alice.nodeParams.copy(peerScoringConfig = config)
+ val scorer = testKit.spawn(PeerScorer(nodeParams, wallet, tracker.ref, register.ref.toClassic))
+ testFun(Fixture(tracker, register, wallet, scorer))
+ }
+
+ private def channelUpdate(capacity: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = {
+ val messageFlags = MessageFlags(dontForward = !announceChannel)
+ val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true)
+ ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi)
+ }
+
+ private def peerStats(totalAmountIn: BtcAmount = 0 sat,
+ totalAmountOut: BtcAmount = 0 sat,
+ relayFeeEarned: BtcAmount = 0 sat,
+ onChainFeePaid: BtcAmount = 0 sat,
+ liquidityFeeEarned: BtcAmount = 0 sat,
+ liquidityFeePaid: BtcAmount = 0 sat): PeerStats = PeerStats(
+ totalAmountIn = totalAmountIn.toMilliSatoshi,
+ totalAmountOut = totalAmountOut.toMilliSatoshi,
+ relayFeeEarned = relayFeeEarned.toMilliSatoshi,
+ onChainFeePaid = onChainFeePaid.toMilliSatoshi.truncateToSatoshi,
+ liquidityFeeEarned = liquidityFeeEarned.toMilliSatoshi.truncateToSatoshi,
+ liquidityFeePaid = liquidityFeePaid.toMilliSatoshi.truncateToSatoshi
+ )
+
+ private def channelInfo(canSend: BtcAmount, canReceive: BtcAmount, channelId: ByteVector32 = randomBytes32(), isPublic: Boolean = true): ChannelInfo = ChannelInfo(
+ channelId = channelId,
+ capacity = canSend.toMilliSatoshi.truncateToSatoshi + canReceive.toMilliSatoshi.truncateToSatoshi,
+ canSend = canSend.toMilliSatoshi,
+ canReceive = canReceive.toMilliSatoshi,
+ isPublic = isPublic
+ )
+
+ test("simulate large outgoing flows") {
+ withFixture(onChainBalance = 10 btc) { f =>
+ import f._
+
+ // We have channels with 3 peers:
+ // - we have a very large capacity with our first peer
+ // - we have a smaller capacity with our second peer
+ // - we have medium capacity with our third peer
+ val c1a = channelInfo(canSend = 0.5 btc, canReceive = 2.7 btc)
+ val c1b = channelInfo(canSend = 0.3 btc, canReceive = 3.5 btc, isPublic = false)
+ val c2a = channelInfo(canSend = 0.03 btc, canReceive = 0.25 btc)
+ val c2b = channelInfo(canSend = 0.04 btc, canReceive = 0.17 btc)
+ val c3 = channelInfo(canSend = 0.3 btc, canReceive = 0.7 btc)
+
+ // We need to scale the last bucket of test data based on the current timestamp, otherwise we will overestimate
+ // the outgoing payment volume of the current bucket and thus always increase relay fees.
+ val bucketRatio = Bucket.consumed(TimestampMilli.now())
+
+ // We have a stable, large outgoing flow with our first peer: we should add liquidity and keep our relay fees unchanged.
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.51.btc * bucketRatio, relayFeeEarned = 0.01.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.52 btc, relayFeeEarned = 0.01 btc),
+ peerStats(totalAmountOut = 0.48 btc, relayFeeEarned = 0.01 btc),
+ peerStats(totalAmountOut = 0.47 btc, relayFeeEarned = 0.01 btc),
+ peerStats(totalAmountOut = 0.51 btc, relayFeeEarned = 0.01 btc),
+ ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.5.btc + Random.nextInt(500_000).sat, relayFeeEarned = 0.01 btc)),
+ channels = Seq(c1a, c1b),
+ latestUpdate_opt = Some(channelUpdate(c1a.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)),
+ hasPendingChannel = false,
+ )
+
+ // We have an increasing outgoing flow with our second peer: we should add liquidity and increase our routing fees.
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId2,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.0001.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.015 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.011 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.0010 btc, relayFeeEarned = 0.0001 btc)),
+ channels = Seq(c2a, c2b),
+ latestUpdate_opt = Some(channelUpdate(c2a.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)),
+ hasPendingChannel = false
+ )
+
+ // We have a decreasing outgoing flow with our third peer: we should decrease our routing fees and may add liquidity.
+ val peerInfo3 = PeerInfo(
+ remoteNodeId = remoteNodeId3,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00007.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.03 btc, relayFeeEarned = 0.00009 btc),
+ peerStats(totalAmountOut = 0.07 btc, relayFeeEarned = 0.00012 btc),
+ peerStats(totalAmountOut = 0.07 btc, relayFeeEarned = 0.00011 btc),
+ ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(c3),
+ latestUpdate_opt = Some(channelUpdate(c3.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 2)),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3))
+ }
+
+ // We increase our relay fees with our second peer, on all channels.
+ val feeIncreases = Seq(
+ register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]],
+ register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]],
+ )
+ assert(feeIncreases.map(_.channelId).toSet == Set(c2a.channelId, c2b.channelId))
+ assert(feeIncreases.map(_.message.feeProportionalMillionths).toSet == Set(1500))
+ // We decrease our relay fees with our third peer.
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.channelId == c3.channelId)
+ assert(cmd.message.feeProportionalMillionths == 500)
+ }
+ // We fund channels with all of our peers.
+ val funding = Seq(
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ )
+ assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2, remoteNodeId3))
+ funding.filter(_.nodeId != remoteNodeId1).foreach(f => assert(f.message.fundingAmount < defaultConfig.liquidity.maxFundingAmount))
+ funding.filter(_.nodeId == remoteNodeId1).foreach(f => assert(f.message.fundingAmount == defaultConfig.liquidity.maxFundingAmount))
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("simulate exhausted large outgoing flow") {
+ withFixture(onChainBalance = 10 btc) { f =>
+ import f._
+
+ // Our first peer was very profitable, but it has exhausted its liquidity more than a week ago and doesn't have recent routing activity.
+ // But since it has a large capacity, we still fund a channel to try to revive it.
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq.fill(Bucket.bucketsPerDay * 7)(PeerStats.empty),
+ channels = Seq(channelInfo(canSend = 10_000 sat, canReceive = 3 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ // Our second peer was very profitable as well, but has exhausted its liquidity more recently.
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId2,
+ stats = Seq.fill(Bucket.bucketsPerDay * 3)(PeerStats.empty) ++ Seq.fill(Bucket.bucketsPerDay * 4)(peerStats(totalAmountOut = 0.3 btc, relayFeeEarned = 0.0005 btc)),
+ channels = Seq(channelInfo(canSend = 10_000 sat, canReceive = 3 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ // Our third peer has balanced flows.
+ val peerInfo3 = PeerInfo(
+ remoteNodeId = remoteNodeId3,
+ stats = Seq.fill(Bucket.bucketsPerDay * 7)(peerStats(totalAmountOut = 0.2 btc, totalAmountIn = 0.2 btc, relayFeeEarned = 0.001 btc)),
+ channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 2.5 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3))
+ }
+ val funding = Seq(
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ )
+ assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2))
+ // We fund a smaller channel with the first node, because we don't know how it will perform.
+ funding.filter(_.nodeId == remoteNodeId1).foreach(f => assert(f.message.fundingAmount < defaultConfig.liquidity.maxFundingAmount))
+ funding.filter(_.nodeId == remoteNodeId2).foreach(f => assert(f.message.fundingAmount == defaultConfig.liquidity.maxFundingAmount))
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("simulate good small peers that need more capacity") {
+ // We use a configuration that selects only our best peer: we should still select a smaller peer that performs well.
+ // We set a large scoring frequency to remove the random rate-limit (of 1 small peer funding every 3 days).
+ val config = defaultConfig.copy(topPeersCount = 1, scoringFrequency = 7 days)
+ withFixture(onChainBalance = 10 btc, config = config) { f =>
+ import f._
+
+ // Our first peer has the biggest outgoing payment volume and capacity and needs more liquidity.
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.001 btc, totalAmountOut = 0.002 btc, relayFeeEarned = 10_000 sat)),
+ channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 3.5 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ // Our second peer, which has a smaller capacity, has a better per-capacity profit ratio.
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId2,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.0005 btc, totalAmountOut = 0.001 btc, relayFeeEarned = 5_000 sat)),
+ channels = Seq(channelInfo(canSend = 0.01 btc, canReceive = 0.07 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ // Our third peer also has a good per-capacity profit ratio, but doesn't need any liquidity.
+ val peerInfo3 = PeerInfo(
+ remoteNodeId = remoteNodeId3,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 0.0005 btc, totalAmountOut = 0.001 btc, relayFeeEarned = 5_000 sat)),
+ channels = Seq(channelInfo(canSend = 0.04 btc, canReceive = 0.04 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3))
+ }
+ val funding = Seq(
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ register.expectMessageType[Register.ForwardNodeId[OpenChannel]],
+ )
+ assert(funding.map(_.nodeId).toSet == Set(remoteNodeId1, remoteNodeId2))
+ // We increase small peers' capacity by 50%.
+ funding.find(_.nodeId == remoteNodeId2).foreach(cmd => assert(cmd.message.fundingAmount == 0.04.btc.toSatoshi))
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("fund channels with whitelisted peers") {
+ val config = defaultConfig.copy(topPeersWhitelist = Set(remoteNodeId3))
+ withFixture(onChainBalance = 10 btc, config = config) { f =>
+ import f._
+
+ // Our first peer isn't very profitable and doesn't need liquidity (small outgoing flow).
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 250_000 sat, totalAmountOut = 300_000 sat, relayFeeEarned = 20_000 sat)),
+ channels = Seq(channelInfo(canSend = 0.4 btc, canReceive = 0.5 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ // Our second peer isn't very profitable either and doesn't need liquidity (small incoming flow).
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId2,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountIn = 300_000 sat, totalAmountOut = 280_000 sat, relayFeeEarned = 15_000 sat)),
+ channels = Seq(channelInfo(canSend = 0.3 btc, canReceive = 0.2 btc)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ // Our third peer has exhausted its liquidity a while ago and lost most of its capacity: but it is whitelisted,
+ // so we should fund a new channel with them.
+ val peerInfo3 = PeerInfo(
+ remoteNodeId = remoteNodeId3,
+ stats = Seq.fill(weeklyBuckets)(PeerStats.empty),
+ channels = Seq(channelInfo(canSend = 150_000 sat, canReceive = 1_400_000 sat)),
+ latestUpdate_opt = None,
+ hasPendingChannel = false,
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3))
+ }
+ inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd =>
+ assert(cmd.nodeId == remoteNodeId3)
+ assert(cmd.message.fundingAmount == defaultConfig.liquidity.minFundingAmount)
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("don't fund channels if channel is already being created") {
+ withFixture(onChainBalance = 5 btc) { f =>
+ import f._
+
+ // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity.
+ val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc))
+ val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc)
+
+ // If there are no pending channel, we suggest adding liquidity.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false)))
+ }
+ inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd =>
+ assert(cmd.nodeId == remoteNodeId1)
+ assert(cmd.message.fundingAmount > 0.01.btc)
+ }
+
+ // If there is already a pending channel, we don't suggest adding liquidity.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = true)))
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("don't fund channels if feerate is too high") {
+ val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(maxFeerate = FeeratePerByte(1 sat).perKw))
+ withFixture(onChainBalance = 5 btc, config = config) { f =>
+ import f._
+
+ // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity.
+ val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc))
+ val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc)
+
+ // But the feerate is too high compared to our configured threshold, so we don't do anything.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false)))
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("don't fund channels if on-chain balance is too low") {
+ val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(minOnChainBalance = 1.5 btc))
+ withFixture(onChainBalance = 1.5 btc, config = config) { f =>
+ import f._
+
+ // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity.
+ val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc))
+ val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc)
+
+ // But our balance is too low compared to our configured threshold, so we don't do anything.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false)))
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("don't update relay fees too frequently") {
+ withFixture(onChainBalance = 0 btc) { f =>
+ import f._
+
+ val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc)
+ val latestUpdate = channelUpdate(channel.capacity, timestamp = TimestampSecond.now() - Bucket.duration * 3)
+
+ // We have an increasing outgoing flow with our peer: we should increase our routing fees.
+ val peerInfo = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.02.btc, relayFeeEarned = 0.00015.btc),
+ peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc),
+ peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(latestUpdate),
+ hasPendingChannel = false
+ )
+
+ // We increase our routing fees.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo))
+ }
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.channelId == channel.channelId)
+ assert(cmd.message.feeProportionalMillionths > latestUpdate.feeProportionalMillionths)
+ }
+ register.expectNoMessage(100 millis)
+
+ // However, if our latest update was done recently, we don't update our routing fees.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo.copy(latestUpdate_opt = Some(latestUpdate.copy(timestamp = TimestampSecond.now() - Bucket.duration)))))
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("close idle channels with liquidity") {
+ withFixture(onChainBalance = 0 btc) { f =>
+ import f._
+
+ // We have several old channels where liquidity is on our side.
+ val c1a = channelInfo(canSend = 0.5 btc, canReceive = 0.01 btc)
+ val c1b = channelInfo(canSend = 0.8 btc, canReceive = 0.1 btc, isPublic = false)
+ val c1c = channelInfo(canSend = 0.4 btc, canReceive = 0.05 btc)
+ val c2a = channelInfo(canSend = 0.3 btc, canReceive = 0.1 btc)
+ val c2b = channelInfo(canSend = 0.2 btc, canReceive = 0.01 btc)
+ val c3 = channelInfo(canSend = 0.8 btc, canReceive = 0.05 btc)
+
+ // The outgoing volume of the corresponding peers is negligible.
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 10_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 100 sat)),
+ channels = Seq(c1a, c1b, c1c),
+ latestUpdate_opt = None,
+ hasPendingChannel = false
+ )
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId2,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 5_000 sat, totalAmountIn = 8_000 sat, relayFeeEarned = 80 sat)),
+ channels = Seq(c2a, c2b),
+ latestUpdate_opt = None,
+ hasPendingChannel = false
+ )
+ val peerInfo3 = PeerInfo(
+ remoteNodeId = remoteNodeId3,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 5_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 50 sat)),
+ channels = Seq(c3),
+ latestUpdate_opt = None,
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1, peerInfo2, peerInfo3))
+ }
+ // We want to keep one channel with each peer, and select the one that likely has the best score for path-finding algorithms.
+ assert(Seq(
+ register.expectMessageType[Register.Forward[CMD_CLOSE]],
+ register.expectMessageType[Register.Forward[CMD_CLOSE]],
+ register.expectMessageType[Register.Forward[CMD_CLOSE]]
+ ).map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId, c2b.channelId))
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("don't close idle channels if feerate is too high") {
+ val config = defaultConfig.copy(liquidity = defaultConfig.liquidity.copy(maxFeerate = FeeratePerByte(1 sat).perKw))
+ withFixture(onChainBalance = 0 btc, config = config) { f =>
+ import f._
+
+ // We have several old channels where liquidity is on our side.
+ val c1a = channelInfo(canSend = 0.5 btc, canReceive = 0.01 btc)
+ val c1b = channelInfo(canSend = 0.8 btc, canReceive = 0.1 btc)
+ val c1c = channelInfo(canSend = 0.4 btc, canReceive = 0.05 btc)
+
+ // The outgoing volume of the corresponding peer is negligible.
+ val peerInfo = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 10_000 sat, totalAmountIn = 5_000 sat, relayFeeEarned = 100 sat)),
+ channels = Seq(c1a, c1b, c1c),
+ latestUpdate_opt = None,
+ hasPendingChannel = false
+ )
+
+ // But the feerate is too high compared to our configured threshold, so we don't close channels.
+ assert(TestConstants.Alice.nodeParams.currentFeeratesForFundingClosing.medium > config.liquidity.maxFeerate)
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo))
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("don't fund the same peer within cooldown period") {
+ withFixture(onChainBalance = 10 btc) { f =>
+ import f._
+
+ // We have a stable, large outgoing flow with a single peer and not much liquidity left: we should add liquidity.
+ val stats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1 btc, relayFeeEarned = 0.01 btc))
+ val channel = channelInfo(canSend = 0.1 btc, canReceive = 4.9 btc)
+
+ // First scoring cycle: we fund the peer.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false)))
+ }
+ inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd =>
+ assert(cmd.nodeId == remoteNodeId1)
+ }
+ register.expectNoMessage(100 millis)
+
+ // Second scoring cycle with the same volume: we don't fund again because the cooldown is active and volume hasn't improved.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, stats, Seq(channel), None, hasPendingChannel = false)))
+ }
+ register.expectNoMessage(100 millis)
+
+ // Third scoring cycle: volume improved by more than 10%, so we fund again despite cooldown.
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ val improvedStats = Seq.fill(weeklyBuckets)(peerStats(totalAmountOut = 1.2 btc, relayFeeEarned = 0.012 btc))
+ msg.replyTo ! LatestStats(Seq(PeerInfo(remoteNodeId1, improvedStats, Seq(channel), None, hasPendingChannel = false)))
+ }
+ inside(register.expectMessageType[Register.ForwardNodeId[OpenChannel]]) { cmd =>
+ assert(cmd.nodeId == remoteNodeId1)
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("revert fee increase when volume drops") {
+ withFixture(onChainBalance = 0 btc) { f =>
+ import f._
+
+ val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc)
+ val initialFee = RelayFees(250 msat, 1000)
+ val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3)
+
+ // First cycle: increasing volume triggers a fee increase.
+ val bucketRatio = Bucket.consumed(TimestampMilli.now())
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00015.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc),
+ peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(latestUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1))
+ }
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.message.feeProportionalMillionths == 1500)
+ }
+ register.expectNoMessage(100 millis)
+
+ // Second cycle: the fee increase is now in effect (channel update reflects new fee), but volume has dropped >20%.
+ // The update timestamp is within the evaluation window (6h-24h ago).
+ val newUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3)
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ // Volume has dropped significantly (>20% of what it was when we increased fees).
+ peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(newUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo2))
+ }
+ // The fee should be reverted to the original value.
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.message.feeProportionalMillionths == initialFee.feeProportionalMillionths)
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("revert fee decrease when volume doesn't recover") {
+ withFixture(onChainBalance = 0 btc) { f =>
+ import f._
+
+ val channel = channelInfo(canSend = 0.5 btc, canReceive = 0.5 btc)
+ val initialFee = RelayFees(250 msat, 2000)
+ val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3)
+
+ // First cycle: decreasing volume triggers a fee decrease.
+ val bucketRatio = Bucket.consumed(TimestampMilli.now())
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.003.btc * bucketRatio, relayFeeEarned = 0.00003.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00003 btc),
+ peerStats(totalAmountOut = 0.01 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.00012 btc),
+ ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.00012 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(latestUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1))
+ }
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.message.feeProportionalMillionths == 1500)
+ }
+ register.expectNoMessage(100 millis)
+
+ // Second cycle: the fee decrease is in effect but volume hasn't recovered (below 90% of what it was).
+ val newUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3)
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ // Volume is still very low (well below 90% of dailyVolumeOutAtChange).
+ peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(newUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo2))
+ }
+ // The fee should be reverted to the original value.
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.message.feeProportionalMillionths == initialFee.feeProportionalMillionths)
+ }
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+ test("fee reverts should not oscillate") {
+ withFixture(onChainBalance = 0 btc) { f =>
+ import f._
+
+ val channel = channelInfo(canSend = 0.3 btc, canReceive = 0.5 btc)
+ val initialFee = RelayFees(250 msat, 1000)
+ val latestUpdate = channelUpdate(channel.capacity, fees = initialFee, timestamp = TimestampSecond.now() - Bucket.duration * 3)
+
+ // Cycle 1: increasing volume triggers a fee increase (1000 -> 1500).
+ val bucketRatio = Bucket.consumed(TimestampMilli.now())
+ val peerInfo1 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.02.btc * bucketRatio, relayFeeEarned = 0.00015.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.012 btc, relayFeeEarned = 0.0001 btc),
+ peerStats(totalAmountOut = 0.005 btc, relayFeeEarned = 0.00007 btc),
+ peerStats(totalAmountOut = 0.003 btc, relayFeeEarned = 0.00005 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 5)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(latestUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo1))
+ }
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.message.feeProportionalMillionths == 1500)
+ }
+ register.expectNoMessage(100 millis)
+
+ // Cycle 2: volume dropped >20% after the increase: we revert back to 1000.
+ val increasedFeeUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1500), timestamp = TimestampSecond.now() - Bucket.duration * 3)
+ val peerInfo2 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.001.btc * bucketRatio, relayFeeEarned = 0.00001.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc),
+ ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.001 btc, relayFeeEarned = 0.00001 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(increasedFeeUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo2))
+ }
+ inside(register.expectMessageType[Register.Forward[CMD_UPDATE_RELAY_FEE]]) { cmd =>
+ assert(cmd.message.feeProportionalMillionths == 1000) // reverted
+ }
+ register.expectNoMessage(100 millis)
+
+ // Cycle 3: the revert is now in effect, but volume continues to decline. The revert recorded a FeeDecrease in
+ // the history (from 1500 back to 1000). If the code treats this revert as a regular fee decrease decision, the
+ // revert logic would trigger again (volume < 90% of what it was at revert time) and flip the fee back to 1500,
+ // creating an oscillation: 1000 -> 1500 -> 1000 -> 1500 -> ...
+ // A revert should be final and not subject to further revert evaluation.
+ val revertedFeeUpdate = channelUpdate(channel.capacity, fees = RelayFees(250 msat, 1000), timestamp = TimestampSecond.now() - Bucket.duration * 3)
+ val peerInfo3 = PeerInfo(
+ remoteNodeId = remoteNodeId1,
+ stats = Seq(
+ peerStats(totalAmountOut = 0.0005.btc * bucketRatio, relayFeeEarned = 0.000005.btc * bucketRatio),
+ peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc),
+ peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc),
+ peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc),
+ ) ++ Seq.fill(weeklyBuckets - 4)(peerStats(totalAmountOut = 0.0005 btc, relayFeeEarned = 0.000005 btc)),
+ channels = Seq(channel),
+ latestUpdate_opt = Some(revertedFeeUpdate),
+ hasPendingChannel = false
+ )
+
+ scorer ! ScorePeers(None)
+ inside(tracker.expectMessageType[GetLatestStats]) { msg =>
+ msg.replyTo ! LatestStats(Seq(peerInfo3))
+ }
+ // The fee should NOT change again: the revert was a correction, not a new decision to evaluate.
+ register.expectNoMessage(100 millis)
+ }
+ }
+
+}
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala
new file mode 100644
index 0000000000..c4dea6f884
--- /dev/null
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/profit/PeerStatsTrackerSpec.scala
@@ -0,0 +1,339 @@
+package fr.acinq.eclair.profit
+
+import akka.actor.ActorRef
+import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
+import com.typesafe.config.ConfigFactory
+import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
+import fr.acinq.bitcoin.scalacompat.{Block, BtcAmount, BtcDouble, ByteVector32, ByteVector64, OutPoint, SatoshiLong, Script, Transaction, TxOut}
+import fr.acinq.eclair.TestUtils.randomTxId
+import fr.acinq.eclair.blockchain.fee.FeeratePerKw
+import fr.acinq.eclair.channel._
+import fr.acinq.eclair.payment.PaymentEvent.{IncomingPayment, OutgoingPayment}
+import fr.acinq.eclair.payment.relay.Relayer.RelayFees
+import fr.acinq.eclair.payment.{ChannelPaymentRelayed, TrampolinePaymentRelayed}
+import fr.acinq.eclair.profit.PeerStatsTracker._
+import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo}
+import fr.acinq.eclair.wire.protocol.ChannelUpdate.{ChannelFlags, MessageFlags}
+import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
+import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiryDelta, Features, MilliSatoshiLong, RealShortChannelId, TestDatabases, TimestampMilli, TimestampSecond, ToMilliSatoshiConversion, randomBytes32, randomKey}
+import org.scalatest.Inside.inside
+import org.scalatest.funsuite.AnyFunSuiteLike
+import scodec.bits.{ByteVector, HexStringSyntax}
+
+import scala.concurrent.duration.DurationInt
+
+class PeerStatsTrackerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike {
+
+ private val dummyPubKey = PrivateKey(ByteVector32.One).publicKey
+ private val dummyAliases = ShortIdAliases(Alias(42), None)
+ private val dummyChannelAnn = ChannelAnnouncement(ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, ByteVector64.Zeroes, Features.empty, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), dummyPubKey, dummyPubKey, dummyPubKey, dummyPubKey)
+
+ private val localNodeId = PublicKey(hex"03bd04635f1465d9347f3d69edc51f17cdf9548847533e084cd9d153a4abb065cd")
+ private val remoteNodeId1 = PublicKey(hex"024c9c77624899672c78d84b551ef1187cbb17618b2d96ef189d0ea36f307be76e")
+ private val remoteNodeId2 = PublicKey(hex"02271ffb6969f6dc4769438637d7d24dc3358098cdef7a772f9ccfd31251470e28")
+ private val remoteNodeId3 = PublicKey(hex"028f5be42aa013f9fd2e5a28a152563ac21acc095ef65cab2e835a789d2a4add96")
+
+ private def commitments(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, announceChannel: Boolean = true): Commitments = {
+ CommitmentsSpec.makeCommitments(toLocal.toMilliSatoshi, toRemote.toMilliSatoshi, localNodeId, remoteNodeId, if (announceChannel) Some(dummyChannelAnn) else None)
+ }
+
+ private def updateChannelBalance(c: Commitments, toLocal: BtcAmount, toRemote: BtcAmount): Commitments = {
+ val c1 = commitments(c.remoteNodeId, toLocal, toRemote, c.announceChannel)
+ c1.copy(channelParams = c1.channelParams.copy(channelId = c.channelId))
+ }
+
+ private def channelUpdate(capacity: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), timestamp: TimestampSecond = TimestampSecond.now(), announceChannel: Boolean = true): ChannelUpdate = {
+ val messageFlags = MessageFlags(dontForward = !announceChannel)
+ val channelFlags = ChannelFlags(isEnabled = true, isNode1 = true)
+ ChannelUpdate(ByteVector64.Zeroes, Block.RegtestGenesisBlock.hash, RealShortChannelId(42), timestamp, messageFlags, channelFlags, CltvExpiryDelta(36), 1 msat, fees.feeBase, fees.feeProportionalMillionths, capacity.toMilliSatoshi)
+ }
+
+ private def channel(remoteNodeId: PublicKey, toLocal: BtcAmount, toRemote: BtcAmount, fees: RelayFees = RelayFees(250 msat, 1000), announceChannel: Boolean = true): DATA_NORMAL = {
+ val c = commitments(remoteNodeId, toLocal, toRemote, announceChannel)
+ val ann_opt = if (announceChannel) Some(dummyChannelAnn) else None
+ val update = channelUpdate(c.capacity, fees, TimestampSecond.now(), announceChannel)
+ DATA_NORMAL(c, dummyAliases, ann_opt, update, SpliceStatus.NoSplice, None, None, None)
+ }
+
+ test("create buckets") {
+ // February 5th 2026 at 12h00 UTC.
+ val timestamp = TimestampMilli(1770292800000L)
+ assert(Bucket.from(timestamp) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2))
+ assert(Bucket.from(timestamp - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1))
+ assert(Bucket.from(timestamp - Bucket.duration) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 1))
+ assert(Bucket.from(timestamp - Bucket.duration - 1.millis) == Bucket(day = 5, month = 2, year = 2026, slot = Bucket.bucketsPerDay / 2 - 2))
+ assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2) == Bucket(day = 5, month = 2, year = 2026, slot = 0))
+ assert(Bucket.from(timestamp - Bucket.duration * Bucket.bucketsPerDay / 2 - 1.millis) == Bucket(day = 4, month = 2, year = 2026, slot = Bucket.bucketsPerDay - 1))
+ }
+
+ test("sort buckets") {
+ val b1 = Bucket(day = 30, month = 11, year = 2025, slot = 0)
+ val b2 = Bucket(day = 30, month = 11, year = 2025, slot = 7)
+ val b3 = Bucket(day = 30, month = 11, year = 2025, slot = 9)
+ val b4 = Bucket(day = 1, month = 12, year = 2025, slot = 2)
+ val b5 = Bucket(day = 1, month = 12, year = 2025, slot = 3)
+ val b6 = Bucket(day = 15, month = 12, year = 2025, slot = 5)
+ val b7 = Bucket(day = 1, month = 1, year = 2026, slot = 1)
+ assert(b1 < b2 && b2 < b3 && b3 < b4 && b4 < b5 && b5 < b6 && b6 < b7)
+ assert(Seq(b3, b6, b5, b1, b4, b2, b7).sorted == Seq(b1, b2, b3, b4, b5, b6, b7))
+ }
+
+ test("evaluate consumed bucket ratio") {
+ // February 5th 2026 at 12h00 UTC.
+ val timestamp = TimestampMilli(1770292800000L)
+ assert(Bucket.consumed(timestamp) == 0.00)
+ assert(Bucket.consumed(timestamp + Bucket.duration / 3) == 1.0 / 3)
+ assert(Bucket.consumed(timestamp + Bucket.duration / 2) == 0.5)
+ assert(Bucket.consumed(timestamp + Bucket.duration * 2 / 3) == 2.0 / 3)
+ assert(Bucket.consumed(timestamp - 10.seconds) >= 0.99)
+ assert(Bucket.consumed(timestamp - 10.seconds) < 1.0)
+ }
+
+ test("keep track of channel balances and state") {
+ val now = TimestampMilli.now()
+ val probe = TestProbe[LatestStats]()
+
+ // We have 4 channels with our first peer: 2 of them are active.
+ val c1a = DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), dummyAliases)
+ val c1b = channel(remoteNodeId1, toLocal = 0.15 btc, toRemote = 0.25 btc)
+ val c1c = channel(remoteNodeId1, toLocal = 0.07 btc, toRemote = 0.03 btc)
+ val c1d = DATA_SHUTDOWN(commitments(remoteNodeId1, toLocal = 0.1 btc, toRemote = 0.2 btc), null, null, CloseStatus.Initiator(None))
+ // We have 2 channels with our second peer: none of them are active.
+ val c2a = DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(remoteNodeId2, toLocal = 0.13 btc, toRemote = 0.24 btc), BlockHeight(750_000), None, null)
+ val c2b = DATA_NEGOTIATING_SIMPLE(commitments(remoteNodeId2, toLocal = 0.5 btc, toRemote = 0.1 btc), FeeratePerKw(2000 sat), ByteVector.empty, ByteVector.empty, Nil, Nil)
+ // We have 2 channels with our third peer: none of them are active.
+ val c3a = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.2 btc), 0 msat, 0 msat, BlockHeight(750_000), BlockHeight(750_000), DualFundingStatus.WaitingForConfirmations, None)
+ val c3b = DATA_CLOSING(commitments(remoteNodeId3, toLocal = 0.2 btc, toRemote = 0.2 btc), BlockHeight(750_000), ByteVector.empty, Nil, ClosingTx(InputInfo(OutPoint(randomTxId(), 2), TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey))), Transaction(2, Nil, TxOut(500_000 sat, Script.pay2wpkh(dummyPubKey)) :: Nil, 0), None) :: Nil)
+
+ // We initialize our actor with these existing channels.
+ val tracker = testKit.spawn(PeerStatsTracker(TestDatabases.inMemoryDb().audit, Seq(c1a, c1b, c1c, c1d, c2a, c2b, c3a, c3b)))
+ // We have relayed payments through channels that have been closed since then.
+ tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 58_000_000 msat, now - 5.minutes)),
+ outgoing = Seq(OutgoingPayment(c3b.channelId, remoteNodeId3, 50_000_000 msat, now))
+ ))
+ // We have relayed payments through active channels as well.
+ tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(IncomingPayment(c2b.channelId, remoteNodeId2, 15_000_000 msat, now - 5.minutes)),
+ outgoing = Seq(OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now))
+ ))
+ tracker.ref ! GetLatestStats(probe.ref)
+ inside(probe.expectMessageType[LatestStats]) { s =>
+ // We only have active channels with our first peer.
+ assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1))
+ // We only take into account the two active channels with our first peer.
+ val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get
+ assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId))
+ assert(peer1.latestUpdate_opt.exists(u => Set(c1b.channelUpdate, c1c.channelUpdate).contains(u)))
+ assert(peer1.capacity == 0.5.btc.toSatoshi)
+ assert(peer1.hasPendingChannel)
+ assert(0.21.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.22.btc.toMilliSatoshi)
+ assert(0.27.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.28.btc.toMilliSatoshi)
+ }
+
+ // Our pending channel with our second peer becomes ready.
+ val update2a = channelUpdate(c2a.commitments.capacity, RelayFees(500 msat, 500))
+ tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate(
+ channel = ActorRef.noSender,
+ channelId = c2a.channelId,
+ aliases = dummyAliases,
+ remoteNodeId = remoteNodeId2,
+ announcement_opt = None,
+ channelUpdate = update2a,
+ commitments = c2a.commitments,
+ ))
+
+ // Our pending channel with our third peer is aborted.
+ tracker.ref ! WrappedLocalChannelDown(LocalChannelDown(
+ channel = ActorRef.noSender,
+ channelId = c3a.channelId,
+ realScids = Nil,
+ aliases = dummyAliases,
+ remoteNodeId = remoteNodeId3
+ ))
+
+ // We forget about our third peer, with whom we don't have pending or active channels anymore.
+ tracker.ref ! GetLatestStats(probe.ref)
+ inside(probe.expectMessageType[LatestStats]) { s =>
+ assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2))
+ assert(!s.peers.find(_.remoteNodeId == remoteNodeId2).get.hasPendingChannel)
+ assert(s.peers.find(_.remoteNodeId == remoteNodeId2).get.latestUpdate_opt.contains(update2a))
+ }
+
+ // We relay a payment to our second peer.
+ tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(IncomingPayment(c1b.channelId, remoteNodeId1, 30_000_000 msat, now - 5.minutes)),
+ outgoing = Seq(OutgoingPayment(c2a.channelId, remoteNodeId2, 15_000_000 msat, now))
+ ))
+ tracker.ref ! GetLatestStats(probe.ref)
+ inside(probe.expectMessageType[LatestStats]) { s =>
+ assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2))
+ val peer2 = s.peers.find(_.remoteNodeId == remoteNodeId2).get
+ assert(peer2.channels.map(_.channelId).toSet == Set(c2a.channelId))
+ assert(peer2.latestUpdate_opt.contains(update2a))
+ }
+
+ // We update our routing fees with our first peer.
+ val update1b = channelUpdate(c1b.commitments.capacity, RelayFees(100 msat, 600), TimestampSecond.now() + 10.seconds)
+ tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate(
+ channel = ActorRef.noSender,
+ channelId = c1b.channelId,
+ aliases = dummyAliases,
+ remoteNodeId = remoteNodeId1,
+ announcement_opt = None,
+ channelUpdate = update1b,
+ commitments = updateChannelBalance(c1b.commitments, toLocal = 0.3 btc, toRemote = 0.1 btc),
+ ))
+
+ // We ignore previous channel updates from another channel.
+ val update1c = channelUpdate(c1c.commitments.capacity, RelayFees(150 msat, 400), update1b.timestamp - 10.seconds)
+ tracker.ref ! WrappedLocalChannelUpdate(LocalChannelUpdate(
+ channel = ActorRef.noSender,
+ channelId = c1c.channelId,
+ aliases = dummyAliases,
+ remoteNodeId = remoteNodeId1,
+ announcement_opt = None,
+ channelUpdate = update1c,
+ commitments = updateChannelBalance(c1c.commitments, toLocal = 0.05 btc, toRemote = 0.05 btc),
+ ))
+
+ // Channels with our second peer are closed.
+ tracker.ref ! WrappedLocalChannelDown(LocalChannelDown(
+ channel = ActorRef.noSender,
+ channelId = c2a.channelId,
+ realScids = Nil,
+ aliases = dummyAliases,
+ remoteNodeId = remoteNodeId2
+ ))
+ tracker.ref ! WrappedLocalChannelDown(LocalChannelDown(
+ channel = ActorRef.noSender,
+ channelId = c2b.channelId,
+ realScids = Nil,
+ aliases = dummyAliases,
+ remoteNodeId = remoteNodeId2
+ ))
+
+ // The only remaining channels are with our first peer, with updated balances and the latest channel update.
+ tracker.ref ! GetLatestStats(probe.ref)
+ inside(probe.expectMessageType[LatestStats]) { s =>
+ assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1))
+ val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get
+ assert(peer1.channels.map(_.channelId).toSet == Set(c1b.channelId, c1c.channelId))
+ assert(peer1.capacity == 0.5.btc.toSatoshi)
+ assert(0.34.btc.toMilliSatoshi <= peer1.canSend && peer1.canSend <= 0.35.btc.toMilliSatoshi)
+ assert(0.14.btc.toMilliSatoshi <= peer1.canReceive && peer1.canReceive <= 0.15.btc.toMilliSatoshi)
+ assert(peer1.latestUpdate_opt.contains(update1b))
+ }
+ }
+
+ test("keep track of peer statistics") {
+ val now = TimestampMilli.now()
+ val probe = TestProbe[LatestStats]()
+ val tracker = testKit.spawn(PeerStatsTracker(TestDatabases.inMemoryDb().audit, Nil))
+
+ // We have channels with 3 peers:
+ val c1a = commitments(remoteNodeId1, toLocal = 0.5 btc, toRemote = 0.3 btc)
+ val c1b = commitments(remoteNodeId1, toLocal = 0.4 btc, toRemote = 0.2 btc, announceChannel = false)
+ val c2 = commitments(remoteNodeId2, toLocal = 0.01 btc, toRemote = 0.9 btc)
+ val c3 = commitments(remoteNodeId3, toLocal = 0.7 btc, toRemote = 0.1 btc)
+ Seq(c1a, c1b, c2, c3).foreach(c => tracker.ref ! WrappedAvailableBalanceChanged(AvailableBalanceChanged(
+ channel = ActorRef.noSender,
+ channelId = c.channelId,
+ aliases = dummyAliases,
+ commitments = c,
+ lastAnnouncement_opt = None,
+ )))
+
+ // We have relayed some payments with all of those peers.
+ tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(IncomingPayment(c1a.channelId, remoteNodeId1, 30_000_000 msat, now - Bucket.duration * 2)),
+ outgoing = Seq(OutgoingPayment(c2.channelId, remoteNodeId2, 20_000_000 msat, now - Bucket.duration * 2))
+ ))
+ tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(IncomingPayment(c2.channelId, remoteNodeId2, 10_000_000 msat, now - Bucket.duration * 2)),
+ outgoing = Seq(OutgoingPayment(c3.channelId, remoteNodeId3, 9_000_000 msat, now - Bucket.duration * 2))
+ ))
+ tracker.ref ! WrappedPaymentRelayed(ChannelPaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(
+ IncomingPayment(c2.channelId, remoteNodeId2, 30_000_000 msat, now - Bucket.duration),
+ IncomingPayment(c3.channelId, remoteNodeId3, 25_000_000 msat, now - Bucket.duration),
+ ),
+ outgoing = Seq(
+ OutgoingPayment(c1a.channelId, remoteNodeId1, 50_000_000 msat, now - Bucket.duration),
+ )
+ ))
+ tracker.ref ! WrappedPaymentRelayed(TrampolinePaymentRelayed(
+ paymentHash = randomBytes32(),
+ incoming = Seq(
+ IncomingPayment(c2.channelId, remoteNodeId2, 21_000_000 msat, now),
+ IncomingPayment(c2.channelId, remoteNodeId2, 34_000_000 msat, now),
+ ),
+ outgoing = Seq(
+ OutgoingPayment(c3.channelId, remoteNodeId3, 22_000_000 msat, now),
+ OutgoingPayment(c3.channelId, remoteNodeId3, 18_000_000 msat, now),
+ OutgoingPayment(c1b.channelId, remoteNodeId1, 10_000_000 msat, now),
+ ),
+ nextTrampolineNodeId = randomKey().publicKey,
+ nextTrampolineAmount = 50_000_000 msat,
+ ))
+
+ // We keep track of aggregated statistics per bucket.
+ tracker.ref ! GetLatestStats(probe.ref)
+ inside(probe.expectMessageType[LatestStats]) { s =>
+ assert(s.peers.map(_.remoteNodeId).toSet == Set(remoteNodeId1, remoteNodeId2, remoteNodeId3))
+ assert(s.peers.flatMap(_.stats.map(_.profit)).sum == 21_000_000.msat)
+ // We only have routing activity in the past 3 buckets.
+ s.peers.foreach(p => assert(p.stats.drop(3).forall(olderStats => olderStats == PeerStats.empty)))
+ // We verify that routing activity is correctly recorded in the right bucket.
+ val peer1 = s.peers.find(_.remoteNodeId == remoteNodeId1).get
+ assert(peer1.capacity == 1.4.btc.toSatoshi)
+ assert(peer1.canSend == c1a.availableBalanceForSend + c1b.availableBalanceForSend)
+ assert(peer1.canReceive == c1a.availableBalanceForReceive + c1b.availableBalanceForReceive)
+ assert(peer1.stats.head.totalAmountIn == 0.msat)
+ assert(peer1.stats.head.totalAmountOut == 10_000_000.msat)
+ assert(peer1.stats.head.relayFeeEarned == 1_000_000.msat)
+ assert(peer1.stats(1).totalAmountIn == 0.msat)
+ assert(peer1.stats(1).totalAmountOut == 50_000_000.msat)
+ assert(peer1.stats(1).relayFeeEarned == 5_000_000.msat)
+ assert(peer1.stats(2).totalAmountIn == 30_000_000.msat)
+ assert(peer1.stats(2).totalAmountOut == 0.msat)
+ assert(peer1.stats(2).relayFeeEarned == 0.msat)
+ assert(peer1.stats.map(_.outgoingFlow).sum == 30_000_000.msat)
+ val peer2 = s.peers.find(_.remoteNodeId == remoteNodeId2).get
+ assert(peer2.capacity == 0.91.btc.toSatoshi)
+ assert(peer2.canSend == c2.availableBalanceForSend)
+ assert(peer2.canReceive == c2.availableBalanceForReceive)
+ assert(peer2.stats.head.totalAmountIn == 55_000_000.msat)
+ assert(peer2.stats.head.totalAmountOut == 0.msat)
+ assert(peer2.stats.head.relayFeeEarned == 0.msat)
+ assert(peer2.stats(1).totalAmountIn == 30_000_000.msat)
+ assert(peer2.stats(1).totalAmountOut == 0.msat)
+ assert(peer2.stats(1).relayFeeEarned == 0.msat)
+ assert(peer2.stats(2).totalAmountIn == 10_000_000.msat)
+ assert(peer2.stats(2).totalAmountOut == 20_000_000.msat)
+ assert(peer2.stats(2).relayFeeEarned == 10_000_000.msat)
+ assert(peer2.stats.map(_.outgoingFlow).sum == -75_000_000.msat)
+ val peer3 = s.peers.find(_.remoteNodeId == remoteNodeId3).get
+ assert(peer3.capacity == 0.8.btc.toSatoshi)
+ assert(peer3.canSend == c3.availableBalanceForSend)
+ assert(peer3.canReceive == c3.availableBalanceForReceive)
+ assert(peer3.stats.head.totalAmountIn == 0.msat)
+ assert(peer3.stats.head.totalAmountOut == 40_000_000.msat)
+ assert(peer3.stats.head.relayFeeEarned == 4_000_000.msat)
+ assert(peer3.stats(1).totalAmountIn == 25_000_000.msat)
+ assert(peer3.stats(1).totalAmountOut == 0.msat)
+ assert(peer3.stats(1).relayFeeEarned == 0.msat)
+ assert(peer3.stats(2).totalAmountIn == 0.msat)
+ assert(peer3.stats(2).totalAmountOut == 9_000_000.msat)
+ assert(peer3.stats(2).relayFeeEarned == 1_000_000.msat)
+ assert(peer3.stats.map(_.outgoingFlow).sum == 24_000_000.msat)
+ }
+ }
+
+}