Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 78 additions & 12 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ type DB struct {
graph *ChannelGraph
clock clock.Clock
dryRun bool

chanCache *kvdb.Cache
}

// Open opens or creates channeldb. Any necessary schemas migrations due
Expand Down Expand Up @@ -260,6 +262,16 @@ func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
return db, err
}

func resetChanStateCache(cache *kvdb.Cache) error {
// We don't use cache for Bolt backend.
if cache == nil {
return nil
}

cache.Wipe()
return cache.Init()
}

// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
// Any necessary schemas migrations due to updates will take place as necessary.
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, error) {
Expand All @@ -274,18 +286,64 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,

chanDB := &DB{
Backend: backend,
channelStateDB: &ChannelStateDB{
linkNodeDB: &LinkNodeDB{
backend: backend,
},
backend: backend,
},
clock: opts.clock,
dryRun: opts.dryRun,
clock: opts.clock,
dryRun: opts.dryRun,
}

chanStateBackend := backend

// Override the chan state backend if we require to cache chan state.
if opts.ChanStateCache {
skipped := [][]byte{
// Skip the graph buckets.
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,

// Skip some non performance critical large buckets.
closedChannelBucket,
closeSummaryBucket,
fwdPackagesKey,
revocationLogBucket,
}

topLevel := [][]byte{
// Read through the graph buckets.
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,

// Cache important channel state.
openChannelBucket,
outpointBucket,
nodeInfoBucket,

// Channel state buckets to read through.
closedChannelBucket,
closeSummaryBucket,
fwdPackagesKey,
}

cache := kvdb.NewCache(backend, topLevel, skipped)
if err := cache.Init(); err != nil {
return nil, err
}

chanStateBackend = cache
chanDB.chanCache = cache
}

// Set the parent pointer (only used in tests).
chanDB.channelStateDB.parent = chanDB
chanDB.channelStateDB = &ChannelStateDB{
linkNodeDB: &LinkNodeDB{
backend: chanStateBackend,
},
backend: chanStateBackend,

// Set the parent pointer (only used in tests).
parent: chanDB,
}

var err error
chanDB.graph, err = NewChannelGraph(
Expand Down Expand Up @@ -343,7 +401,11 @@ func (d *DB) Wipe() error {
return err
}

return initChannelDB(d.Backend)
if err := initChannelDB(d.Backend); err != nil {
return err
}

return resetChanStateCache(d.chanCache)
}

// initChannelDB creates and initializes a fresh version of channeldb. In the
Expand Down Expand Up @@ -518,7 +580,6 @@ func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = c

channels = append(channels, oChannel)

return nil
Expand Down Expand Up @@ -1440,6 +1501,11 @@ func MakeTestDB(modifiers ...OptionModifier) (*DB, func(), error) {
return nil, nil, err
}

// Use a channel state cache when testing with remote backends.
if kvdb.TestBackend != kvdb.BoltBackendName {
modifiers = append(modifiers, OptionWithChannelStateCache(true))
}

cdb, err := CreateWithBackend(backend, modifiers...)
if err != nil {
backendCleanup()
Expand Down
17 changes: 15 additions & 2 deletions channeldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ func TestOpenWithCreate(t *testing.T) {
}
defer cleanup()

cdb, err := CreateWithBackend(backend)
var modifiers []OptionModifier
// Use a channel state cache when testing with remote backends.
if kvdb.TestBackend != kvdb.BoltBackendName {
modifiers = append(modifiers, OptionWithChannelStateCache(true))
}

cdb, err := CreateWithBackend(backend, modifiers...)

if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
}
Expand Down Expand Up @@ -87,7 +94,13 @@ func TestWipe(t *testing.T) {
}
defer cleanup()

fullDB, err := CreateWithBackend(backend)
var modifiers []OptionModifier
// Use a channel state cache when testing with remote backends.
if kvdb.TestBackend != kvdb.BoltBackendName {
modifiers = append(modifiers, OptionWithChannelStateCache(true))
}

fullDB, err := CreateWithBackend(backend, modifiers...)
if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
}
Expand Down
12 changes: 12 additions & 0 deletions channeldb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const (
type Options struct {
kvdb.BoltBackendConfig

// ChanStateCache when true turns of in-memory caching of important
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/of/on/

// channel state buckets.
ChanStateCache bool

// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
RejectCacheSize int
Expand Down Expand Up @@ -72,6 +76,14 @@ func DefaultOptions() Options {
// OptionModifier is a function signature for modifying the default Options.
type OptionModifier func(*Options)

// OptionWithChannelStateCache turns on in-memory caching of important channel
// state buckets.
func OptionWithChannelStateCache(cache bool) OptionModifier {
return func(o *Options) {
o.ChanStateCache = cache
}
}

// OptionSetRejectCacheSize sets the RejectCacheSize to n.
func OptionSetRejectCacheSize(n int) OptionModifier {
return func(o *Options) {
Expand Down
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.14.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ you.
buffer each time we decrypt an incoming message, as we
recycle these buffers in the peer.

* [Cache the channel state](https://github.com/lightningnetwork/lnd/pull/5595)
to achieve better performance when running LND using a remote DB backend.

## Log system

* [Save compressed log files from logrorate during
Expand Down
44 changes: 43 additions & 1 deletion kvdb/bolt_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package kvdb

import (
"fmt"
"testing"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/stretchr/testify/require"
)

func TestBolt(t *testing.T) {
Expand Down Expand Up @@ -71,14 +73,54 @@ func TestBolt(t *testing.T) {

for _, test := range tests {
test := test
cache := []bool{false, true}
for _, useCache := range cache {
name := fmt.Sprintf("%v/Cache=%v", test.name, useCache)

t.Run(name, func(t *testing.T) {
t.Parallel()

f := NewBoltFixture(t)
defer f.Cleanup()

backend := f.NewBackend()
if useCache {
cache := NewCache(backend, nil, nil)
require.NoError(t, cache.Init())
backend = cache
}

test.test(t, backend)
})
}
}
}

func TestCacheBolt(t *testing.T) {
tests := []struct {
name string
test func(*testing.T, walletdb.DB)
}{
{
name: "cache fill",
test: testCacheFill,
},
{
name: "cache rollback",
test: testCacheRollback,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()

f := NewBoltFixture(t)
defer f.Cleanup()

test.test(t, f.NewBackend())
backend := f.NewBackend()
test.test(t, backend)
})
}
}
Loading