Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions chanbackup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
)

// LiveChannelSource is an interface that allows us to query for the set of
Expand All @@ -17,8 +18,9 @@ type LiveChannelSource interface {
FetchAllChannels() ([]*channeldb.OpenChannel, error)

// FetchChannel attempts to locate a live channel identified by the
// passed chanPoint.
FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error)
// passed chanPoint. Optionally an existing db tx can be supplied.
FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error)

// AddrsForNode returns all known addresses for the target node public
// key.
Expand Down Expand Up @@ -55,7 +57,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint,

// First, we'll query the channel source to see if the channel is known
// and open within the database.
targetChan, err := chanSource.FetchChannel(chanPoint)
targetChan, err := chanSource.FetchChannel(nil, chanPoint)
if err != nil {
// If we can't find the channel, then we return with an error,
// as we have nothing to backup.
Expand Down
5 changes: 4 additions & 1 deletion chanbackup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
)

type mockChannelSource struct {
Expand Down Expand Up @@ -38,7 +39,9 @@ func (m *mockChannelSource) FetchAllChannels() ([]*channeldb.OpenChannel, error)
return chans, nil
}

func (m *mockChannelSource) FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error) {
func (m *mockChannelSource) FetchChannel(_ kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) {

if m.failQuery {
return nil, fmt.Errorf("fail")
}
Expand Down
15 changes: 12 additions & 3 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,11 @@ func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error)

// FetchChannel attempts to locate a channel specified by the passed channel
// point. If the channel cannot be found, then an error will be returned.
func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
// Optionally an existing db tx can be supplied. Optionally an existing db tx
// can be supplied.
func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel,
error) {

var (
targetChan *OpenChannel
targetChanPoint bytes.Buffer
Expand Down Expand Up @@ -583,7 +587,12 @@ func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
})
}

err := kvdb.View(d, chanScan, func() {})
var err error
if tx == nil {
err = kvdb.View(d, chanScan, func() {})
} else {
err = chanScan(tx)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1102,7 +1111,7 @@ func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := d.FetchChannel(*chanPoint)
dbChan, err := d.FetchChannel(nil, *chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
Expand Down
8 changes: 4 additions & 4 deletions channeldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestFetchChannel(t *testing.T) {
channelState := createTestChannel(t, cdb, openChannelOption())

// Next, attempt to fetch the channel by its chan point.
dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint)
dbChannel, err := cdb.FetchChannel(nil, channelState.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
Expand All @@ -275,7 +275,7 @@ func TestFetchChannel(t *testing.T) {
}
channelState2.FundingOutpoint.Index ^= 1

_, err = cdb.FetchChannel(channelState2.FundingOutpoint)
_, err = cdb.FetchChannel(nil, channelState2.FundingOutpoint)
if err == nil {
t.Fatalf("expected query to fail")
}
Expand Down Expand Up @@ -416,7 +416,7 @@ func TestRestoreChannelShells(t *testing.T) {

// We should also be able to find the channel if we query for it
// directly.
_, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint)
_, err = cdb.FetchChannel(nil, channelShell.Chan.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestAbandonChannel(t *testing.T) {

// At this point, the channel should no longer be found in the set of
// open channels.
_, err = cdb.FetchChannel(chanState.FundingOutpoint)
_, err = cdb.FetchChannel(nil, chanState.FundingOutpoint)
if err != ErrChannelNotFound {
t.Fatalf("channel should not have been found: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion channelnotifier/channelnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint,
// channel has gone from pending open to open.
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {
// Fetch the relevant channel from the database.
channel, err := c.chanDB.FetchChannel(chanPoint)
channel, err := c.chanDB.FetchChannel(nil, chanPoint)
if err != nil {
log.Warnf("Unable to fetch open channel from the db: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
// same instance that is used by the link.
chanPoint := a.channel.FundingOutpoint

channel, err := a.c.chanSource.FetchChannel(chanPoint)
channel, err := a.c.chanSource.FetchChannel(nil, chanPoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -301,7 +301,7 @@ func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error)
// Now that we know the link can't mutate the channel
// state, we'll read the channel from disk the target
// channel according to its channel point.
channel, err := a.c.chanSource.FetchChannel(chanPoint)
channel, err := a.c.chanSource.FetchChannel(nil, chanPoint)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -1210,6 +1211,7 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
edgesToUpdate []updateTuple
)
err := d.cfg.Router.ForAllOutgoingChannels(func(
_ kvdb.RTx,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {

Expand Down
9 changes: 7 additions & 2 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntest/wait"
Expand Down Expand Up @@ -200,7 +201,8 @@ func (r *mockGraphSource) ForEachNode(func(node *channeldb.LightningNode) error)
return nil
}

func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdgeInfo,
func (r *mockGraphSource) ForAllOutgoingChannels(cb func(tx kvdb.RTx,
i *channeldb.ChannelEdgeInfo,
c *channeldb.ChannelEdgePolicy) error) error {

r.mu.Lock()
Expand All @@ -223,7 +225,9 @@ func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdg
}

for _, channel := range chans {
cb(channel.Info, channel.Policy1)
if err := cb(nil, channel.Info, channel.Policy1); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -3568,6 +3572,7 @@ out:
const newTimeLockDelta = 100
var edgesToUpdate []EdgeWithInfo
err = ctx.router.ForAllOutgoingChannels(func(
_ kvdb.RTx,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {

Expand Down
2 changes: 2 additions & 0 deletions docs/release-notes/release-notes-0.14.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ you.
* [Fixed context leak in integration tests, and properly handled context
timeout](https://github.com/lightningnetwork/lnd/pull/5646).

* [Removed nested db tx](https://github.com/lightningnetwork/lnd/pull/5643)

## Database

* [Ensure single writer for legacy
Expand Down
2 changes: 1 addition & 1 deletion pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
ChannelInfo: func(chanPoint wire.OutPoint) (
*autopilot.LocalChannel, error) {

channel, err := svr.chanStateDB.FetchChannel(chanPoint)
channel, err := svr.chanStateDB.FetchChannel(nil, chanPoint)
if err != nil {
return nil, err
}
Expand Down
22 changes: 13 additions & 9 deletions routing/localchans/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
Expand All @@ -27,12 +28,14 @@ type Manager struct {

// ForAllOutgoingChannels is required to iterate over all our local
// channels.
ForAllOutgoingChannels func(cb func(*channeldb.ChannelEdgeInfo,
ForAllOutgoingChannels func(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error

// FetchChannel is used to query local channel parameters.
FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
error)
// FetchChannel is used to query local channel parameters. Optionally an
// existing db tx can be supplied.
FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error)

// policyUpdateLock ensures that the database and the link do not fall
// out of sync if there are concurrent fee update calls. Without it,
Expand Down Expand Up @@ -66,6 +69,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := r.ForAllOutgoingChannels(func(
tx kvdb.RTx,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {

Expand All @@ -77,7 +81,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
}

// Apply the new policy to the edge.
err := r.updateEdge(info.ChannelPoint, edge, newSchema)
err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema)
if err != nil {
log.Warnf("Cannot update policy for %v: %v\n",
info.ChannelPoint, err,
Expand Down Expand Up @@ -123,7 +127,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
}

// updateEdge updates the given edge with the new schema.
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
edge *channeldb.ChannelEdgePolicy,
newSchema routing.ChannelPolicy) error {

Expand All @@ -135,7 +139,7 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint,
edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)

// Retrieve negotiated channel htlc amt limits.
amtMin, amtMax, err := r.getHtlcAmtLimits(chanPoint)
amtMin, amtMax, err := r.getHtlcAmtLimits(tx, chanPoint)
if err != nil {
return nil
}
Expand Down Expand Up @@ -194,10 +198,10 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint,

// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
// constraints.
func (r *Manager) getHtlcAmtLimits(chanPoint wire.OutPoint) (
func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) (
lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {

ch, err := r.FetchChannel(chanPoint)
ch, err := r.FetchChannel(tx, chanPoint)
if err != nil {
return 0, 0, err
}
Expand Down
9 changes: 6 additions & 3 deletions routing/localchans/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package localchans
import (
"testing"

"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"

"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -88,10 +89,12 @@ func TestManager(t *testing.T) {
return nil
}

forAllOutgoingChannels := func(cb func(*channeldb.ChannelEdgeInfo,
forAllOutgoingChannels := func(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {

return cb(
nil,
&channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPoint,
Expand All @@ -100,8 +103,8 @@ func TestManager(t *testing.T) {
)
}

fetchChannel := func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
error) {
fetchChannel := func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) {

constraints := channeldb.ChannelConstraints{
MaxPendingAmount: maxPendingAmount,
Expand Down
12 changes: 7 additions & 5 deletions routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ type ChannelGraphSource interface {
// ForAllOutgoingChannels is used to iterate over all channels
// emanating from the "source" node which is the center of the
// star-graph.
ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgeInfo,
ForAllOutgoingChannels(cb func(tx kvdb.RTx,
c *channeldb.ChannelEdgeInfo,
e *channeldb.ChannelEdgePolicy) error) error

// CurrentBlockHeight returns the block height from POV of the router
Expand Down Expand Up @@ -2426,17 +2427,18 @@ func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) err
// the router.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error {

return r.selfNode.ForEachChannel(nil, func(_ kvdb.RTx, c *channeldb.ChannelEdgeInfo,
return r.selfNode.ForEachChannel(nil, func(tx kvdb.RTx,
c *channeldb.ChannelEdgeInfo,
e, _ *channeldb.ChannelEdgePolicy) error {

if e == nil {
return fmt.Errorf("channel from self node has no policy")
}

return cb(c, e)
return cb(tx, c, e)
})
}

Expand Down
4 changes: 2 additions & 2 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2124,7 +2124,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,

// First, we'll fetch the channel as is, as we'll need to examine it
// regardless of if this is a force close or not.
channel, err := r.server.chanStateDB.FetchChannel(*chanPoint)
channel, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint)
if err != nil {
return err
}
Expand Down Expand Up @@ -2402,7 +2402,7 @@ func (r *rpcServer) AbandonChannel(_ context.Context,
return nil, err
}

dbChan, err := r.server.chanStateDB.FetchChannel(*chanPoint)
dbChan, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint)
switch {
// If the channel isn't found in the set of open channels, then we can
// continue on as it can't be loaded into the link/peer.
Expand Down