Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))

goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)
goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1, channelUpdateBeforeRestore_opt = Some(normal.channelUpdate))

case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
watchFundingTx(funding.commitments)
Expand Down Expand Up @@ -666,7 +666,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val initialChannelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, shortChannelId, nodeParams.expiryDelta, d.commitments.remoteParams.htlcMinimum, fees.feeBase, fees.feeProportionalMillionths, commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
context.system.scheduler.scheduleWithFixedDelay(initialDelay = REFRESH_CHANNEL_UPDATE_INTERVAL, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))
goto(NORMAL) using DATA_NORMAL(commitments.copy(remoteNextCommitInfo = Right(nextPerCommitmentPoint)), shortChannelId, buried = false, None, initialChannelUpdate, None, None) storing()
goto(NORMAL) using DATA_NORMAL(commitments.copy(remoteNextCommitInfo = Right(nextPerCommitmentPoint)), shortChannelId, buried = false, None, initialChannelUpdate, None, None, None) storing()

case Event(remoteAnnSigs: AnnouncementSignatures, d: DATA_WAIT_FOR_FUNDING_LOCKED) if d.commitments.announceChannel =>
log.debug("received remote announcement signatures, delaying")
Expand Down Expand Up @@ -1879,24 +1879,23 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => ()
}

val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}

(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.debug("re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.channelUpdateBeforeRestore_opt, normal.commitments))
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
log.info("emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
// in any other case (e.g. OFFLINE->SYNCING) we do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ final case class DATA_NORMAL(commitments: Commitments,
buried: Boolean,
channelAnnouncement: Option[ChannelAnnouncement],
channelUpdate: ChannelUpdate,
channelUpdateBeforeRestore_opt: Option[ChannelUpdate],
localShutdown: Option[Shutdown],
remoteShutdown: Option[Shutdown]) extends ChannelData with HasCommitments
final case class DATA_SHUTDOWN(commitments: Commitments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Announcements

/**
* This actor sits at the interface between our event stream and the database.
Expand Down Expand Up @@ -119,15 +120,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case Some(previous) if Announcements.areSameWithoutFlags(previous, u.channelUpdate) => () // channel update hasn't changed
case _ => auditDb.addChannelUpdate(u)
}

}

override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameWithoutFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0)

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.transactions._
import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import scodec.bits.{BitVector, ByteVector}
import scodec.codecs._
import scodec.{Attempt, Codec}
Expand Down Expand Up @@ -309,6 +309,7 @@ private[channel] object ChannelCodecs0 {
("buried" | bool) ::
("channelAnnouncement" | optional(bool, variableSizeBytes(noUnknownFieldsChannelAnnouncementSizeCodec, channelAnnouncementCodec))) ::
("channelUpdate" | variableSizeBytes(noUnknownFieldsChannelUpdateSizeCodec, channelUpdateCodec)) ::
("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) ::
("localShutdown" | optional(bool, shutdownCodec)) ::
("remoteShutdown" | optional(bool, shutdownCodec))).as[DATA_NORMAL].decodeOnly

Expand All @@ -318,6 +319,7 @@ private[channel] object ChannelCodecs0 {
("buried" | bool) ::
("channelAnnouncement" | optional(bool, variableSizeBytes(uint16, channelAnnouncementCodec))) ::
("channelUpdate" | variableSizeBytes(uint16, channelUpdateCodec)) ::
("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) ::
("localShutdown" | optional(bool, shutdownCodec)) ::
("remoteShutdown" | optional(bool, shutdownCodec))).as[DATA_NORMAL].decodeOnly

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0
import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import scodec.bits.ByteVector
import scodec.codecs._
import scodec.{Attempt, Codec}
Expand Down Expand Up @@ -247,6 +247,7 @@ private[channel] object ChannelCodecs1 {
("buried" | bool8) ::
("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) ::
("channelUpdate" | lengthDelimited(channelUpdateCodec)) ::
("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) ::
("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) ::
("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec)))).as[DATA_NORMAL]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0
import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import scodec.bits.ByteVector
import scodec.codecs._
import scodec.{Attempt, Codec}
Expand Down Expand Up @@ -282,6 +282,7 @@ private[channel] object ChannelCodecs2 {
("buried" | bool8) ::
("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) ::
("channelUpdate" | lengthDelimited(channelUpdateCodec)) ::
("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) ::
("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) ::
("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec)))).as[DATA_NORMAL]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.transactions.{CommitmentSpec, DirectedHtlc, IncomingHtlc, OutgoingHtlc}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import fr.acinq.eclair.{FeatureSupport, Features, MilliSatoshi}
import scodec.bits.{BitVector, ByteVector}
import scodec.codecs._
Expand Down Expand Up @@ -302,6 +302,7 @@ private[channel] object ChannelCodecs3 {
("buried" | bool8) ::
("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) ::
("channelUpdate" | lengthDelimited(channelUpdateCodec)) ::
("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) ::
("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) ::
("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec)))).as[DATA_NORMAL]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package fr.acinq.eclair.channel

import akka.testkit.TestProbe
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.TestConstants.Alice
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.payment.relay.Relayer.{RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner}
import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck}
Expand Down Expand Up @@ -122,4 +126,66 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha
val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil))
Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
}

test("restore channel without configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(TestConstants.Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
val u = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u.previousChannelUpdate_opt.nonEmpty)
assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, u.channelUpdate))
assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, oldStateData.channelUpdate))
}

test("restore channel with configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice with a different configuration
val newFees = RelayFees(765 msat, 2345)
val newConfig = TestConstants.Alice.nodeParams.copy(relayParams = RelayParams(newFees, newFees, newFees))
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
val u = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u.previousChannelUpdate_opt.nonEmpty)
assert(!Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, u.channelUpdate))
assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, oldStateData.channelUpdate))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ object ChannelCodecsSpec {
commitInput = commitmentInput,
remotePerCommitmentSecrets = ShaChain.init)

DATA_NORMAL(commitments, ShortChannelId(42), buried = true, None, channelUpdate, None, None)
DATA_NORMAL(commitments, ShortChannelId(42), buried = true, None, channelUpdate, None, None, None)
}

}