Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 86 additions & 20 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
Expand Down Expand Up @@ -432,7 +433,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
ltndLog.Infof("Elected as leader (%v)", cfg.Cluster.ID)
}

localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg)
localChanDB, remoteChanDB, cleanUp, err := initializeChannelDatabases(ctx, cfg)
switch {
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("%v, exiting", err)
Expand Down Expand Up @@ -799,19 +800,11 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error

var tower *watchtower.Standalone
if cfg.Watchtower.Active {
// Segment the watchtower directory by chain and network.
towerDBDir := filepath.Join(
cfg.Watchtower.TowerDir,
cfg.registeredChains.PrimaryChain().String(),
lncfg.NormalizeNetwork(cfg.ActiveNetParams.Name),
)

towerDB, err := wtdb.OpenTowerDB(
towerDBDir, cfg.DB.Bolt.DBTimeout,
)
// Grab pointer to either local or remote tower database.
towerDB, err := initializeTowerDatabase(ctx, cfg)
if err != nil {
err := fmt.Errorf("unable to open watchtower "+
"database: %v", err)
err := fmt.Errorf("unable to open watchtower (%s) database: %v",
cfg.DB.Backend, err)
ltndLog.Error(err)
return err
}
Expand Down Expand Up @@ -1600,13 +1593,13 @@ func waitForWalletPassword(cfg *Config,
}
}

// initializeDatabases extracts the current databases that we'll use for normal
// operation in the daemon. Two databases are returned: one remote and one
// local. However, only if the replicated database is active will the remote
// database point to a unique database. Otherwise, the local and remote DB will
// both point to the same local database. A function closure that closes all
// opened databases is also returned.
func initializeDatabases(ctx context.Context,
// initializeChannelDatabases extracts the current databases that we'll use for
// normal operation in the daemon. Two databases are returned: one remote and
// one local. However, only if the replicated database is active will the
// remote database point to a unique database. Otherwise, the local and remote
// DB will both point to the same local database. A function closure that
// closes all opened databases is also returned.
func initializeChannelDatabases(ctx context.Context,
cfg *Config) (*channeldb.DB, *channeldb.DB, func(), error) {

ltndLog.Infof("Opening the main database, this might take a few " +
Expand Down Expand Up @@ -1724,6 +1717,79 @@ func initializeDatabases(ctx context.Context,
return localChanDB, remoteChanDB, cleanUp, nil
}

// initializeTowerDatabase configures either a local (boltdb) or remote (etcd)
// database for the watchtower server.
func initializeTowerDatabase(ctx context.Context,
cfg *Config) (*wtdb.TowerDB, error) {

ltndLog.Infof("Opening the watchtower database, this might take a few " +
"minutes...")

if cfg.DB.Backend == lncfg.BoltBackend {
ltndLog.Infof("Opening bbolt database for watchtower, sync_freelist=%v, "+
"auto_compact=%v", cfg.DB.Bolt.SyncFreelist,
cfg.DB.Bolt.AutoCompact)
}

startOpenTime := time.Now()

var (
towerDB *wtdb.TowerDB
err error
)

if cfg.DB.Backend == lncfg.BoltBackend {
// Segment the watchtower directory by chain and network.
towerDBDir := filepath.Join(
cfg.Watchtower.TowerDir,
cfg.registeredChains.PrimaryChain().String(),
lncfg.NormalizeNetwork(cfg.ActiveNetParams.Name),
)

// Open the towerdb, which is dedicated to storing watchtower
// server data.
towerDB, err = wtdb.OpenTowerDB(
towerDBDir, cfg.DB.Bolt.DBTimeout,
)
if err != nil {
err := fmt.Errorf("unable to open local watchtower "+
"database: %v", err)
ltndLog.Error(err)
return nil, err
}

} else if cfg.DB.Backend == lncfg.EtcdBackend {
ltndLog.Infof("Database replication is available! Creating " +
"remote towerdb instance")

// Open connection to remote etcd backend.
remoteDB, err := kvdb.Open(
kvdb.EtcdBackendName, ctx, cfg.DB.Etcd,
)
if err != nil {
return nil, err
}

// Initialize the remote watchtower db.
towerDB, err = wtdb.CreateWithBackend(
remoteDB,
)
if err != nil {
towerDB.Close()

err := fmt.Errorf("unable to open remote watchtower "+
"database: %v", err)
ltndLog.Error(err)
return nil, err
}
}

openTime := time.Since(startOpenTime)
ltndLog.Infof("Tower database now open (time_to_open=%v)!", openTime)

return towerDB, nil
}

// initNeutrinoBackend inits a new instance of the neutrino light client
// backend given a target chain directory to store the chain state.
func initNeutrinoBackend(cfg *Config, chainDir string,
Expand Down
46 changes: 17 additions & 29 deletions watchtower/wtdb/client_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var (
// ClientDB is single database providing a persistent storage engine for the
// wtclient.
type ClientDB struct {
db kvdb.Backend
kvdb.Backend
dbPath string
}

Expand All @@ -139,8 +139,8 @@ func OpenClientDB(dbPath string, dbTimeout time.Duration) (*ClientDB, error) {
}

clientDB := &ClientDB{
db: bdb,
dbPath: dbPath,
Backend: bdb,
dbPath: dbPath,
}

err = initOrSyncVersions(clientDB, firstInit, clientDBVersions)
Expand All @@ -154,7 +154,7 @@ func OpenClientDB(dbPath string, dbTimeout time.Duration) (*ClientDB, error) {
// initialized. This allows us to assume their presence throughout all
// operations. If an known top-level bucket is expected to exist but is
// missing, this will trigger a ErrUninitializedDB error.
err = kvdb.Update(clientDB.db, initClientDBBuckets, func() {})
err = kvdb.Update(clientDB, initClientDBBuckets, func() {})
if err != nil {
bdb.Close()
return nil, err
Expand Down Expand Up @@ -184,19 +184,12 @@ func initClientDBBuckets(tx kvdb.RwTx) error {
return nil
}

// bdb returns the backing bbolt.DB instance.
//
// NOTE: Part of the versionedDB interface.
func (c *ClientDB) bdb() kvdb.Backend {
return c.db
}

// Version returns the database's current version number.
//
// NOTE: Part of the versionedDB interface.
func (c *ClientDB) Version() (uint32, error) {
var version uint32
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.View(c, func(tx kvdb.RTx) error {
var err error
version, err = getDBVersion(tx)
return err
Expand All @@ -210,11 +203,6 @@ func (c *ClientDB) Version() (uint32, error) {
return version, nil
}

// Close closes the underlying database.
func (c *ClientDB) Close() error {
return c.db.Close()
}

// CreateTower initialize an address record used to communicate with a
// watchtower. Each Tower is assigned a unique ID, that is used to amortize
// storage costs of the public key when used by multiple sessions. If the tower
Expand All @@ -225,7 +213,7 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) {
copy(towerPubKey[:], lnAddr.IdentityKey.SerializeCompressed())

var tower *Tower
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
err := kvdb.Update(c, func(tx kvdb.RwTx) error {
towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
if towerIndex == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -320,7 +308,7 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) {
//
// NOTE: An error is not returned if the tower doesn't exist.
func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
return kvdb.Update(c, func(tx kvdb.RwTx) error {
towers := tx.ReadWriteBucket(cTowerBkt)
if towers == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -401,7 +389,7 @@ func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
// LoadTowerByID retrieves a tower by its tower ID.
func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
var tower *Tower
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.View(c, func(tx kvdb.RTx) error {
towers := tx.ReadBucket(cTowerBkt)
if towers == nil {
return ErrUninitializedDB
Expand All @@ -423,7 +411,7 @@ func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
// LoadTower retrieves a tower by its public key.
func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
var tower *Tower
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.View(c, func(tx kvdb.RTx) error {
towers := tx.ReadBucket(cTowerBkt)
if towers == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -454,7 +442,7 @@ func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
// ListTowers retrieves the list of towers available within the database.
func (c *ClientDB) ListTowers() ([]*Tower, error) {
var towers []*Tower
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.View(c, func(tx kvdb.RTx) error {
towerBucket := tx.ReadBucket(cTowerBkt)
if towerBucket == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -487,7 +475,7 @@ func (c *ClientDB) NextSessionKeyIndex(towerID TowerID,
blobType blob.Type) (uint32, error) {

var index uint32
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
err := kvdb.Update(c, func(tx kvdb.RwTx) error {
keyIndex := tx.ReadWriteBucket(cSessionKeyIndexBkt)
if keyIndex == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -539,7 +527,7 @@ func (c *ClientDB) NextSessionKeyIndex(towerID TowerID,
// CreateClientSession records a newly negotiated client session in the set of
// active sessions. The session can be identified by its SessionID.
func (c *ClientDB) CreateClientSession(session *ClientSession) error {
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
return kvdb.Update(c, func(tx kvdb.RwTx) error {
keyIndexes := tx.ReadWriteBucket(cSessionKeyIndexBkt)
if keyIndexes == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -641,7 +629,7 @@ func getSessionKeyIndex(keyIndexes kvdb.RwBucket, towerID TowerID,
// response that do not correspond to this tower.
func (c *ClientDB) ListClientSessions(id *TowerID) (map[SessionID]*ClientSession, error) {
var clientSessions map[SessionID]*ClientSession
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.View(c, func(tx kvdb.RTx) error {
sessions := tx.ReadBucket(cSessionBkt)
if sessions == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -697,7 +685,7 @@ func listClientSessions(sessions kvdb.RBucket,
// channel summaries.
func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) {
var summaries map[lnwire.ChannelID]ClientChanSummary
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.View(c, func(tx kvdb.RTx) error {
chanSummaries := tx.ReadBucket(cChanSummaryBkt)
if chanSummaries == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -735,7 +723,7 @@ func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) {
func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID,
sweepPkScript []byte) error {

return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
return kvdb.Update(c, func(tx kvdb.RwTx) error {
chanSummaries := tx.ReadWriteBucket(cChanSummaryBkt)
if chanSummaries == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -779,7 +767,7 @@ func (c *ClientDB) CommitUpdate(id *SessionID,
update *CommittedUpdate) (uint16, error) {

var lastApplied uint16
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
err := kvdb.Update(c, func(tx kvdb.RwTx) error {
sessions := tx.ReadWriteBucket(cSessionBkt)
if sessions == nil {
return ErrUninitializedDB
Expand Down Expand Up @@ -885,7 +873,7 @@ func (c *ClientDB) CommitUpdate(id *SessionID,
func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
lastApplied uint16) error {

return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
return kvdb.Update(c, func(tx kvdb.RwTx) error {
sessions := tx.ReadWriteBucket(cSessionBkt)
if sessions == nil {
return ErrUninitializedDB
Expand Down
Loading