Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chainntnfs/bitcoindnotify/bitcoind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
testCfg := chainntnfs.CacheConfig{
QueryDisable: false,
}
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db)
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db.Backend)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/btcdnotify/btcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
testCfg := chainntnfs.CacheConfig{
QueryDisable: false,
}
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db)
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db.Backend)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
16 changes: 9 additions & 7 deletions chainntnfs/height_hint_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type ConfirmHintCache interface {
// will be stored.
type HeightHintCache struct {
cfg CacheConfig
db *channeldb.DB
db kvdb.Backend
}

// Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache
Expand All @@ -93,7 +93,9 @@ var _ SpendHintCache = (*HeightHintCache)(nil)
var _ ConfirmHintCache = (*HeightHintCache)(nil)

// NewHeightHintCache returns a new height hint cache backed by a database.
func NewHeightHintCache(cfg CacheConfig, db *channeldb.DB) (*HeightHintCache, error) {
func NewHeightHintCache(cfg CacheConfig, db kvdb.Backend) (*HeightHintCache,
error) {

cache := &HeightHintCache{cfg, db}
if err := cache.initBuckets(); err != nil {
return nil, err
Expand All @@ -105,7 +107,7 @@ func NewHeightHintCache(cfg CacheConfig, db *channeldb.DB) (*HeightHintCache, er
// initBuckets ensures that the primary buckets used by the circuit are
// initialized so that we can assume their existence after startup.
func (c *HeightHintCache) initBuckets() error {
return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(spendHintBucket)
if err != nil {
return err
Expand All @@ -127,7 +129,7 @@ func (c *HeightHintCache) CommitSpendHint(height uint32,
Log.Tracef("Updating spend hint to height %d for %v", height,
spendRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
spendHints := tx.ReadWriteBucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
Expand Down Expand Up @@ -197,7 +199,7 @@ func (c *HeightHintCache) PurgeSpendHint(spendRequests ...SpendRequest) error {

Log.Tracef("Removing spend hints for %v", spendRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
spendHints := tx.ReadWriteBucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
Expand Down Expand Up @@ -228,7 +230,7 @@ func (c *HeightHintCache) CommitConfirmHint(height uint32,
Log.Tracef("Updating confirm hints to height %d for %v", height,
confRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
confirmHints := tx.ReadWriteBucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
Expand Down Expand Up @@ -299,7 +301,7 @@ func (c *HeightHintCache) PurgeConfirmHint(confRequests ...ConfRequest) error {

Log.Tracef("Removing confirm hints for %v", confRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
confirmHints := tx.ReadWriteBucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/height_hint_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func initHintCacheWithConfig(t *testing.T, cfg CacheConfig) *HeightHintCache {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := NewHeightHintCache(cfg, db)
hintCache, err := NewHeightHintCache(cfg, db.Backend)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion chainntnfs/test/test_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,9 @@ func TestInterfaces(t *testing.T, targetBackEnd string) {
testCfg := chainntnfs.CacheConfig{
QueryDisable: false,
}
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db)
hintCache, err := chainntnfs.NewHeightHintCache(
testCfg, db.Backend,
)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
Expand Down
15 changes: 9 additions & 6 deletions chainreg/chainregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
Expand Down Expand Up @@ -68,11 +69,13 @@ type Config struct {
// LtcdMode defines settings for connecting to an ltcd node.
LtcdMode *lncfg.Btcd

// LocalChanDB is a pointer to the local backing channel database.
LocalChanDB *channeldb.DB
// HeightHintDB is a pointer to the database that stores the height
// hints.
HeightHintDB kvdb.Backend

// RemoteChanDB is a pointer to the remote backing channel database.
RemoteChanDB *channeldb.DB
// ChanStateDB is a pointer to the database that stores the channel
// state.
ChanStateDB *channeldb.DB

// BlockCacheSize is the size (in bytes) of blocks kept in memory.
BlockCacheSize uint64
Expand Down Expand Up @@ -304,7 +307,7 @@ func NewChainControl(cfg *Config, blockCache *blockcache.BlockCache) (

// Initialize the height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(
heightHintCacheConfig, cfg.LocalChanDB,
heightHintCacheConfig, cfg.HeightHintDB,
)
if err != nil {
return nil, nil, fmt.Errorf("unable to initialize height hint "+
Expand Down Expand Up @@ -684,7 +687,7 @@ func NewChainControl(cfg *Config, blockCache *blockcache.BlockCache) (
// Create, and start the lnwallet, which handles the core payment
// channel logic, and exposes control via proxy state machines.
walletCfg := lnwallet.Config{
Database: cfg.RemoteChanDB,
Database: cfg.ChanStateDB,
Notifier: cc.ChainNotifier,
WalletController: wc,
Signer: cc.Signer,
Expand Down
68 changes: 13 additions & 55 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/go-errors/errors"
mig "github.com/lightningnetwork/lnd/channeldb/migration"
"github.com/lightningnetwork/lnd/channeldb/migration12"
Expand All @@ -25,8 +24,7 @@ import (
)

const (
dbName = "channel.db"
dbFilePermission = 0600
dbName = "channel.db"
)

var (
Expand Down Expand Up @@ -214,46 +212,6 @@ type DB struct {
dryRun bool
}

// Update is a wrapper around walletdb.Update which calls into the extended
// backend when available. This call is needed to be able to cast DB to
// ExtendedBackend. The passed reset function is called before the start of the
// transaction and can be used to reset intermediate state. As callers may
// expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively.
func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.Update(f, reset)
}

reset()
return walletdb.Update(db, f)
}

// View is a wrapper around walletdb.View which calls into the extended
// backend when available. This call is needed to be able to cast DB to
// ExtendedBackend. The passed reset function is called before the start of the
// transaction and can be used to reset intermediate state. As callers may
// expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively.
func (db *DB) View(f func(tx walletdb.ReadTx) error, reset func()) error {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.View(f, reset)
}

reset()
return walletdb.View(db, f)
}

// PrintStats calls into the extended backend if available. This call is needed
// to be able to cast DB to ExtendedBackend.
func (db *DB) PrintStats() string {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.PrintStats()
}

return "unimplemented"
}

// Open opens or creates channeldb. Any necessary schemas migrations due
// to updates will take place as necessary.
// TODO(bhandras): deprecate this function.
Expand Down Expand Up @@ -449,7 +407,7 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error)
// stored currently active/open channels associated with the target nodeID. In
// the case that no active channels are known to have been created with this
// node, then a zero-length slice is returned.
func (db *DB) fetchOpenChannels(tx kvdb.RTx,
func (d *DB) fetchOpenChannels(tx kvdb.RTx,
nodeID *btcec.PublicKey) ([]*OpenChannel, error) {

// Get the bucket dedicated to storing the metadata for open channels.
Expand Down Expand Up @@ -485,7 +443,7 @@ func (db *DB) fetchOpenChannels(tx kvdb.RTx,

// Finally, we both of the necessary buckets retrieved, fetch
// all the active channels related to this node.
nodeChannels, err := db.fetchNodeChannels(chainBucket)
nodeChannels, err := d.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read channel for "+
"chain_hash=%x, node_key=%x: %v",
Expand All @@ -502,7 +460,7 @@ func (db *DB) fetchOpenChannels(tx kvdb.RTx,
// fetchNodeChannels retrieves all active channels from the target chainBucket
// which is under a node's dedicated channel bucket. This function is typically
// used to fetch all the active channels related to a particular node.
func (db *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error) {
func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error) {

var channels []*OpenChannel

Expand All @@ -528,7 +486,7 @@ func (db *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error
return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = db
oChannel.Db = d

channels = append(channels, oChannel)

Expand Down Expand Up @@ -990,8 +948,8 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (db *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
openChannels, err := db.fetchOpenChannels(tx, remotePub)
func (d *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
openChannels, err := d.fetchOpenChannels(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to fetch open channels for peer %x: "+
"%v", remotePub.SerializeCompressed(), err)
Expand All @@ -1004,7 +962,7 @@ func (db *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())

return db.deleteLinkNode(tx, remotePub)
return d.deleteLinkNode(tx, remotePub)
}

// PruneLinkNodes attempts to prune all link nodes found within the databse with
Expand Down Expand Up @@ -1140,16 +1098,16 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
// database. If the channel was already removed (has a closed channel entry),
// then we'll return a nil error. Otherwise, we'll insert a new close summary
// into the database.
func (db *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := db.FetchChannel(*chanPoint)
dbChan, err := d.FetchChannel(*chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
case err == ErrChannelNotFound:
_, closedErr := db.FetchClosedChannel(chanPoint)
_, closedErr := d.FetchClosedChannel(chanPoint)
if closedErr != nil {
return closedErr
}
Expand Down Expand Up @@ -1312,9 +1270,9 @@ func fetchHistoricalChanBucket(tx kvdb.RTx,

// FetchHistoricalChannel fetches open channel data from the historical channel
// bucket.
func (db *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
var channel *OpenChannel
err := kvdb.View(db, func(tx kvdb.RTx) error {
err := kvdb.View(d, func(tx kvdb.RTx) error {
chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3406,7 +3406,7 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
err := kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand Down
24 changes: 12 additions & 12 deletions channeldb/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ type LinkNode struct {

// NewLinkNode creates a new LinkNode from the provided parameters, which is
// backed by an instance of channeldb.
func (db *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
func (d *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
addrs ...net.Addr) *LinkNode {

return &LinkNode{
Network: bitNet,
IdentityPub: pub,
LastSeen: time.Now(),
Addresses: addrs,
db: db,
db: d,
}
}

Expand Down Expand Up @@ -129,13 +129,13 @@ func putLinkNode(nodeMetaBucket kvdb.RwBucket, l *LinkNode) error {

// DeleteLinkNode removes the link node with the given identity from the
// database.
func (db *DB) DeleteLinkNode(identity *btcec.PublicKey) error {
return kvdb.Update(db, func(tx kvdb.RwTx) error {
return db.deleteLinkNode(tx, identity)
func (d *DB) DeleteLinkNode(identity *btcec.PublicKey) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
return d.deleteLinkNode(tx, identity)
}, func() {})
}

func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
func (d *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
nodeMetaBucket := tx.ReadWriteBucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return ErrLinkNodesNotFound
Expand All @@ -148,9 +148,9 @@ func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
// FetchLinkNode attempts to lookup the data for a LinkNode based on a target
// identity public key. If a particular LinkNode for the passed identity public
// key cannot be found, then ErrNodeNotFound if returned.
func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
func (d *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
var linkNode *LinkNode
err := kvdb.View(db, func(tx kvdb.RTx) error {
err := kvdb.View(d, func(tx kvdb.RTx) error {
node, err := fetchLinkNode(tx, identity)
if err != nil {
return err
Expand Down Expand Up @@ -191,10 +191,10 @@ func fetchLinkNode(tx kvdb.RTx, targetPub *btcec.PublicKey) (*LinkNode, error) {

// FetchAllLinkNodes starts a new database transaction to fetch all nodes with
// whom we have active channels with.
func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
func (d *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
var linkNodes []*LinkNode
err := kvdb.View(db, func(tx kvdb.RTx) error {
nodes, err := db.fetchAllLinkNodes(tx)
err := kvdb.View(d, func(tx kvdb.RTx) error {
nodes, err := d.fetchAllLinkNodes(tx)
if err != nil {
return err
}
Expand All @@ -213,7 +213,7 @@ func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {

// fetchAllLinkNodes uses an existing database transaction to fetch all nodes
// with whom we have active channels with.
func (db *DB) fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) {
func (d *DB) fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) {
nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return nil, ErrLinkNodesNotFound
Expand Down
Loading