Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
}
(appKit.router ? Router.GetLocalChannels).mapTo[Iterable[LocalChannel]]
.map(channels => channels.filter(c => nodes.contains(c.remoteNodeId)).map(c => Right(c.shortChannelId)))
.flatMap(channels => sendToChannels[CommandResponse[CMD_UPDATE_RELAY_FEE]](channels.toList, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths)))
.flatMap(channels => sendToChannels[CommandResponse[CMD_UPDATE_RELAY_FEE]](channels.toList, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths, cltvExpiryDelta_opt = None)))
}

override def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] = for {
Expand Down
72 changes: 31 additions & 41 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,32 +284,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
watchFundingTx(data.commitments)
context.system.eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId, None))

// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
// we check the configuration because the values for channel_update may have changed while eclair was down
val fees = getRelayFees(nodeParams, remoteNodeId, data.commitments)
val candidateChannelUpdate = Announcements.makeChannelUpdate(
nodeParams.chainHash,
nodeParams.privateKey,
remoteNodeId,
normal.channelUpdate.shortChannelId,
nodeParams.expiryDelta,
normal.commitments.remoteParams.htlcMinimum,
fees.feeBase,
fees.feeProportionalMillionths,
normal.commitments.capacity.toMilliSatoshi,
enable = Announcements.isEnabled(normal.channelUpdate.channelFlags))
val channelUpdate1 = if (Announcements.areSame(candidateChannelUpdate, normal.channelUpdate)) {
// if there was no configuration change we keep the existing channel update
normal.channelUpdate
} else {
log.info("refreshing channel_update due to configuration changes old={} new={}", normal.channelUpdate, candidateChannelUpdate)
candidateChannelUpdate
if (fees.feeBase != normal.channelUpdate.feeBaseMsat ||
fees.feeProportionalMillionths != normal.channelUpdate.feeProportionalMillionths ||
nodeParams.expiryDelta != normal.channelUpdate.cltvExpiryDelta) {
log.info("refreshing channel_update due to configuration changes")
self ! CMD_UPDATE_RELAY_FEE(ActorRef.noSender, fees.feeBase, fees.feeProportionalMillionths, Some(nodeParams.expiryDelta))
}
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
// we take into account the date of the last update so that we don't send superfluous updates when we restart the app
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(normal.channelUpdate.timestamp)
context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))

goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)
goto(OFFLINE) using normal

case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
watchFundingTx(funding.commitments)
Expand Down Expand Up @@ -1013,12 +1001,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) =>
log.info("updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, c.feeBase, d.channelUpdate.feeProportionalMillionths, c.feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
log.info(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we use GOTO instead of stay() because we want to fire transitions
goto(NORMAL) using d.copy(channelUpdate = channelUpdate) storing()
goto(NORMAL) using d.copy(channelUpdate = channelUpdate1) storing()

case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
val age = System.currentTimeMillis.milliseconds - d.channelUpdate.timestamp.seconds
Expand Down Expand Up @@ -1542,18 +1530,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
case Event(c: CurrentFeerates, d: HasCommitments) => handleCurrentFeerateDisconnected(c, d)

case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)

case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) =>
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, c.feeBase, d.channelUpdate.feeProportionalMillionths, c.feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we're in OFFLINE state, we don't broadcast the new update right away, we will do that when next time we go to NORMAL state
stay() using d.copy(channelUpdate = channelUpdate) storing()
case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) => handleUpdateRelayFeeDisconnected(c, d)

case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx)

Expand Down Expand Up @@ -1692,6 +1673,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)

case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) => handleUpdateRelayFeeDisconnected(c, d)

case Event(channelReestablish: ChannelReestablish, d: DATA_SHUTDOWN) =>
var sendQueue = Queue.empty[LightningMessage]
val (commitments1, sendQueue1) = handleSync(channelReestablish, d)
Expand Down Expand Up @@ -1734,8 +1717,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
case Event(c: CurrentFeerates, d: HasCommitments) => handleCurrentFeerateDisconnected(c, d)

case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx)

Expand Down Expand Up @@ -1879,24 +1861,23 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => ()
}

val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}

(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.debug("re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, Some(normal.channelUpdate), normal.commitments))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we will emit an additional outdated LocalChannelUpdate in case the config has changed.

case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
log.info("emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
// in any other case (e.g. OFFLINE->SYNCING) we do nothing
Expand Down Expand Up @@ -1995,7 +1976,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
* @param d the channel commtiments
* @return
*/
private def handleOfflineFeerate(c: CurrentFeerates, d: HasCommitments) = {
private def handleCurrentFeerateDisconnected(c: CurrentFeerates, d: HasCommitments) = {
val networkFeeratePerKw = nodeParams.onChainFeeConf.getCommitmentFeerate(remoteNodeId, d.commitments.channelFeatures, d.commitments.capacity, Some(c))
val currentFeeratePerKw = d.commitments.localCommit.spec.feeratePerKw
// if the network fees are too high we risk to not be able to confirm our current commitment
Expand Down Expand Up @@ -2148,6 +2129,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}

private def handleUpdateRelayFeeDisconnected(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) = {
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter a lot, but I believe this is a bug:

Suggested change
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Announcements.isEnabled(d.channelUpdate.channelFlags))

log.info(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we're in OFFLINE state, we don't broadcast the new update right away, we will do that when next time we go to NORMAL state
stay() using d.copy(channelUpdate = channelUpdate1) storing()
Comment on lines +2137 to +2138
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... this doesn't work, as evidenced by 2c8db6e.

By using a stay() to defer the sending of the event, we lose the reference to the previous channel_update 🤦‍♂️

}

private def handleNewBlock(c: CurrentBlockCount, d: HasCommitments) = {
val timedOutOutgoing = d.commitments.timedOutOutgoingHtlcs(c.blockCount)
val almostTimedOutIncoming = d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ final case class CMD_SIGN(replyTo_opt: Option[ActorRef] = None) extends HasOptio
sealed trait CloseCommand extends HasReplyToCommand
final case class CMD_CLOSE(replyTo: ActorRef, scriptPubKey: Option[ByteVector]) extends CloseCommand
final case class CMD_FORCECLOSE(replyTo: ActorRef) extends CloseCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long) extends HasReplyToCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long, cltvExpiryDelta_opt: Option[CltvExpiryDelta]) extends HasReplyToCommand
final case class CMD_GETSTATE(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETSTATEDATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETINFO(replyTo: ActorRef)extends HasReplyToCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Announcements

/**
* This actor sits at the interface between our event stream and the database.
Expand Down Expand Up @@ -119,12 +120,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case Some(previous) if Announcements.areSameIgnoreFlags(previous, u.channelUpdate) => () // channel update hasn't changed => ()
case _ => auditDb.addChannelUpdate(u)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameIgnoreFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0)

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ case class ChannelUpdate(signature: ByteVector64,
require(((messageFlags & 1) != 0) == htlcMaximumMsat.isDefined, "htlcMaximumMsat is not consistent with messageFlags")

def isNode1 = Announcements.isNode1(channelFlags)

def toStringShort: String = s"cltvExpiryDelta=$cltvExpiryDelta,feeBase=$feeBaseMsat,feeProportionalMillionths=$feeProportionalMillionths"
}

// @formatter:off
Expand Down
Loading