Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8746a6e
discovery: increase default msg rates to 1MB
yyforyongyu Jul 21, 2025
173dbec
lncfg: update docs about `msg-rate-bytes` and `msg-burst-bytes`
yyforyongyu Jul 21, 2025
f25bfb7
docs: update release notes
yyforyongyu Jul 25, 2025
f6c5cd7
Merge branch '0-19-3-branch-rc1-10096' into 0-19-3-branch-rc1
guggero Aug 6, 2025
57872b9
discovery: make gossip filter semaphore capacity configurable
Roasbeef Jul 21, 2025
80e0ea0
discovery: add async timestamp range queue to prevent blocking
Roasbeef Jul 21, 2025
8eda486
discovery: integrate async queue in ProcessRemoteAnnouncement
Roasbeef Jul 21, 2025
27778cb
multi: wire up gossip filter concurrency config
Roasbeef Jul 21, 2025
a2fcfb0
docs: add comprehensive gossip rate limiting guide
Roasbeef Jul 21, 2025
bb58253
discovery: add tests for for async timestamp range queue
Roasbeef Jul 21, 2025
ce4fdd3
discovery: only permit a single gossip backlog goroutine per peer
Roasbeef Jul 24, 2025
6f09297
docs/release-notes: add release notes entry
Roasbeef Jul 25, 2025
94c0208
Merge branch '0-19-3-branch-rc1-10097' into 0-19-3-branch-rc1
guggero Aug 6, 2025
1345b47
go.mod: bump btclog v2 version
ellemouton Jul 23, 2025
d6523d7
Merge branch '0-19-3-branch-rc1-10107' into 0-19-3-branch-rc1
guggero Aug 6, 2025
91000b9
contractcourt: add a new chan `resolvedChan` to handle resolved channels
yyforyongyu Jul 22, 2025
b0dca5f
contractcourt: replace `MarkChannelClosed` with `NotifyChannelResolved`
yyforyongyu Jul 22, 2025
f12692e
docs: add release notes
yyforyongyu Jul 24, 2025
558ea0f
Merge branch '0-19-3-branch-rc1-10108' into 0-19-3-branch-rc1
guggero Aug 6, 2025
28af39c
contractcourt+sweep: make anchor inputs exclusive
yyforyongyu Jul 30, 2025
3796c99
sweep: only remove the other input from the exclusive group
yyforyongyu Jul 30, 2025
83d33f4
itest: fix tests re the new anchor behavior
yyforyongyu Jul 30, 2025
5a22549
sweep: fix typos
yyforyongyu Jul 8, 2025
b814428
docs: add and update release notes for 0.19.3
yyforyongyu Jul 31, 2025
f275fd6
Merge branch '0-19-3-branch-rc1-10117' into 0-19-3-branch-rc1
guggero Aug 6, 2025
ccaeeb9
itest: run normal FundPsbt test case in remote-signer mode
guggero Jul 31, 2025
f18ae88
itest: run FundPsbt test with imported account
guggero Jul 31, 2025
b525754
mod: update btcwallet to version with fix
guggero Aug 4, 2025
100feb7
docs: add release notes
guggero Aug 5, 2025
9aa70e4
Merge branch '0-19-3-branch-rc1-10119' into 0-19-3-branch-rc1
guggero Aug 6, 2025
97c4012
multi: skip range check in pathfinder and switch for custom htlc paym…
ziggie1984 Aug 2, 2025
4a38f8f
itest: add payment test with max htlc restriction
ziggie1984 Aug 5, 2025
3710bc1
docs: add release-notes
ziggie1984 Aug 5, 2025
d5f451a
Merge branch '0-19-3-branch-rc1-10125' into 0-19-3-branch-rc1
guggero Aug 6, 2025
eccdd3c
build: bump version to v0.19.2-beta.rc1
guggero Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ const (
AppMinor uint = 19

// AppPatch defines the application patch for this binary.
AppPatch uint = 2
AppPatch uint = 3

// AppPreRelease MUST only contain characters from semanticAlphabet per
// the semantic versioning spec.
AppPreRelease = "beta"
AppPreRelease = "beta.rc1"
)

func init() {
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func DefaultConfig() Config {
AnnouncementConf: discovery.DefaultProofMatureDelta,
MsgRateBytes: discovery.DefaultMsgBytesPerSecond,
MsgBurstBytes: discovery.DefaultMsgBytesBurst,
FilterConcurrency: discovery.DefaultFilterConcurrency,
},
Invoices: &lncfg.Invoices{
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,
Expand Down
11 changes: 10 additions & 1 deletion contractcourt/anchor_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ func (c *anchorResolver) Launch() error {
// an output that we want to sweep only if it is economical to do so.
//
// An exclusive group is not necessary anymore, because we know that
// this is the only anchor that can be swept.
// this is the only anchor that can be swept. However, to avoid this
// anchor input being group with other inputs, we still keep the
// exclusive group here such that the anchor will be swept
// independently.
//
// We also clear the parent tx information for cpfp, because the
// commitment tx is confirmed.
Expand All @@ -222,6 +225,8 @@ func (c *anchorResolver) Launch() error {
c.broadcastHeight, nil,
)

exclusiveGroup := c.ShortChanID.ToUint64()

resultChan, err := c.Sweeper.SweepInput(
&anchorInput,
sweep.Params{
Expand All @@ -233,6 +238,10 @@ func (c *anchorResolver) Launch() error {
// There's no rush to sweep the anchor, so we use a nil
// deadline here.
DeadlineHeight: fn.None[int32](),

// Use the chan id as the exclusive group. This prevents
// any of the anchors from being batched together.
ExclusiveGroup: &exclusiveGroup,
},
)

Expand Down
74 changes: 59 additions & 15 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ type ChainArbitrator struct {
// beat is the current best known blockbeat.
beat chainio.Blockbeat

// resolvedChan is used to signal that the given channel outpoint has
// been resolved onchain. Once received, chain arbitrator will perform
// cleanups.
resolvedChan chan wire.OutPoint

quit chan struct{}

wg sync.WaitGroup
Expand All @@ -286,6 +291,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig,
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
chanSource: db,
quit: make(chan struct{}),
resolvedChan: make(chan wire.OutPoint),
}

// Mount the block consumer.
Expand Down Expand Up @@ -459,6 +465,9 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
channel.ShortChanID(), htlc,
)
},
NotifyChannelResolved: func() {
c.notifyChannelResolved(chanPoint)
},
}

// The final component needed is an arbitrator log that the arbitrator
Expand All @@ -474,14 +483,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
return nil, err
}

arbCfg.MarkChannelResolved = func() error {
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(chanPoint)
}

return c.ResolveContract(chanPoint)
}

// Finally, we'll need to construct a series of htlc Sets based on all
// currently known valid commitments.
htlcSets := make(map[HtlcSetKey]htlcSet)
Expand Down Expand Up @@ -578,6 +579,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// Set the current beat.
c.beat = beat

// Start the goroutine which listens for signals to mark the channel as
// resolved.
//
// NOTE: We must start this goroutine here we won't block the following
// channel loading.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.resolveContracts()
}()

// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
if err := c.loadOpenChannels(); err != nil {
Expand Down Expand Up @@ -697,6 +709,32 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
return nil
}

// resolveContracts listens to the `resolvedChan` to mark a given channel as
// fully resolved.
func (c *ChainArbitrator) resolveContracts() {
for {
select {
// The channel arbitrator signals that a given channel has been
// resolved, we now update chain arbitrator's internal state for
// this channel.
case cp := <-c.resolvedChan:
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(cp)
}

err := c.ResolveContract(cp)
if err != nil {
log.Errorf("Failed to resolve contract for "+
"channel %v", cp)
}

// Exit if the chain arbitrator is shutting down.
case <-c.quit:
return
}
}
}

// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
Expand Down Expand Up @@ -762,6 +800,16 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
c.NotifyBlockProcessed(beat, err)
}

// notifyChannelResolved is used by the channel arbitrator to signal that a
// given channel has been resolved.
func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) {
select {
case c.resolvedChan <- cp:
case <-c.quit:
return
}
}

// republishClosingTxs will load any stored cooperative or unilateral closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.
Expand Down Expand Up @@ -1346,20 +1394,16 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error {
closeChanInfo.ShortChanID, htlc,
)
},
NotifyChannelResolved: func() {
c.notifyChannelResolved(chanPoint)
},
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return err
}
arbCfg.MarkChannelResolved = func() error {
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(chanPoint)
}

return c.ResolveContract(chanPoint)
}

// We create an empty map of HTLC's here since it's possible
// that the channel is in StateDefault and updateActiveHTLCs is
Expand Down
15 changes: 4 additions & 11 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,9 @@ type ChannelArbitratorConfig struct {
// true. Otherwise this value is unset.
CloseType channeldb.ClosureType

// MarkChannelResolved is a function closure that serves to mark a
// channel as "fully resolved". A channel itself can be considered
// fully resolved once all active contracts have individually been
// fully resolved.
//
// TODO(roasbeef): need RPC's to combine for pendingchannels RPC
MarkChannelResolved func() error
// NotifyChannelResolved is used by the channel arbitrator to signal
// that a given channel has been resolved.
NotifyChannelResolved func()

// PutResolverReport records a resolver report for the channel. If the
// transaction provided is nil, the function should write the report
Expand Down Expand Up @@ -1397,10 +1393,7 @@ func (c *ChannelArbitrator) stateStep(
log.Infof("ChannelPoint(%v) has been fully resolved "+
"on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)

if err := c.cfg.MarkChannelResolved(); err != nil {
log.Errorf("unable to mark channel resolved: %v", err)
return StateError, closeTx, err
}
c.cfg.NotifyChannelResolved()
}

log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
Expand Down
7 changes: 3 additions & 4 deletions contractcourt/channel_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,17 +417,16 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
}

// We'll use the resolvedChan to synchronize on call to
// MarkChannelResolved.
// NotifyChannelResolved.
resolvedChan := make(chan struct{}, 1)

// Next we'll create the matching configuration struct that contains
// all interfaces and methods the arbitrator needs to do its job.
arbCfg := &ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: shortChanID,
MarkChannelResolved: func() error {
NotifyChannelResolved: func() {
resolvedChan <- struct{}{}
return nil
},
MarkCommitmentBroadcasted: func(_ *wire.MsgTx,
_ lntypes.ChannelParty) error {
Expand Down Expand Up @@ -547,7 +546,7 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
}

// Cooperative close should do trigger a MarkChannelClosed +
// MarkChannelResolved.
// NotifyChannelResolved.
closeInfo := &CooperativeCloseInfo{
&channeldb.ChannelCloseSummary{},
}
Expand Down
20 changes: 14 additions & 6 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ type Config struct {
// MsgBurstBytes is the allotted burst amount in bytes. This is the
// number of starting tokens in our token bucket algorithm.
MsgBurstBytes uint64

// FilterConcurrency is the maximum number of concurrent gossip filter
// applications that can be processed.
FilterConcurrency int
}

// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
Expand Down Expand Up @@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
IsStillZombieChannel: cfg.IsStillZombieChannel,
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
FilterConcurrency: cfg.FilterConcurrency,
})

gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
Expand Down Expand Up @@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
return errChan
}

// If we've found the message target, then we'll dispatch the
// message directly to it.
if err := syncer.ApplyGossipFilter(ctx, m); err != nil {
log.Warnf("Unable to apply gossip filter for peer=%x: "+
"%v", peer.PubKey(), err)
// Queue the message for asynchronous processing to prevent
// blocking the gossiper when rate limiting is active.
if !syncer.QueueTimestampRange(m) {
log.Warnf("Unable to queue gossip filter for peer=%x: "+
"queue full", peer.PubKey())

errChan <- err
// Return nil to indicate we've handled the message,
// even though it was dropped. This prevents the peer
// from being disconnected.
errChan <- nil
return errChan
}

Expand Down
22 changes: 16 additions & 6 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ const (
// network as possible.
DefaultHistoricalSyncInterval = time.Hour

// filterSemaSize is the capacity of gossipFilterSema.
filterSemaSize = 5
// DefaultFilterConcurrency is the default maximum number of concurrent
// gossip filter applications that can be processed.
DefaultFilterConcurrency = 5

// DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
// This is the most that can be sent in a given go. Requests beyond
// this, will block indefinitely. Once tokens (bytes are depleted),
// they'll be refilled at the DefaultMsgBytesPerSecond rate.
DefaultMsgBytesBurst = 2 * 100 * 1_024
DefaultMsgBytesBurst = 2 * 1000 * 1_024

// DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
// messages. Once tokens (bytes) have been taken from the bucket,
// they'll be refilled at this rate.
DefaultMsgBytesPerSecond = 100 * 1_024
DefaultMsgBytesPerSecond = 1000 * 1_024

// assumedMsgSize is the assumed size of a message if we can't compute
// its serialized size. This comes out to 1 KB.
Expand Down Expand Up @@ -136,6 +137,10 @@ type SyncManagerCfg struct {
// AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
// we've exceeded the hard upper limit.
AllotedMsgBytesBurst uint64

// FilterConcurrency is the maximum number of concurrent gossip filter
// applications that can be processed. If not set, defaults to 5.
FilterConcurrency int
}

// SyncManager is a subsystem of the gossiper that manages the gossip syncers
Expand Down Expand Up @@ -207,8 +212,13 @@ type SyncManager struct {
// newSyncManager constructs a new SyncManager backed by the given config.
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {

filterSema := make(chan struct{}, filterSemaSize)
for i := 0; i < filterSemaSize; i++ {
filterConcurrency := cfg.FilterConcurrency
if filterConcurrency == 0 {
filterConcurrency = DefaultFilterConcurrency
}

filterSema := make(chan struct{}, filterConcurrency)
for i := 0; i < filterConcurrency; i++ {
filterSema <- struct{}{}
}

Expand Down
Loading
Loading