From 02aa77261da2a4bf158167727b8ef6be8f71c43c Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Wed, 16 Jun 2021 16:20:54 +0200 Subject: [PATCH 1/3] etcd: fix dereferncing issue in etcd.CommitQueue causing contention This commit fixes an issue where subsequent transaction retries may have changed the read/write sets inside the STM which in turn left junk references to these keys in the CommitQueue. The left keys potentially conflicted with subsequent transactions, queueing them up causing througput degradation. --- kvdb/etcd/commit_queue.go | 14 ++++++------- kvdb/etcd/commit_queue_test.go | 38 +++++++--------------------------- kvdb/etcd/stm.go | 26 +++++++++++++++++++++-- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/kvdb/etcd/commit_queue.go b/kvdb/etcd/commit_queue.go index f03845650ca..54f33c01d30 100644 --- a/kvdb/etcd/commit_queue.go +++ b/kvdb/etcd/commit_queue.go @@ -54,14 +54,14 @@ func (c *commitQueue) Wait() { // Add increases lock counts and queues up tx commit closure for execution. // Transactions that don't have any conflicts are executed immediately by // "downgrading" the count mutex to allow concurrency. -func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { +func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) { c.mx.Lock() blocked := false // Mark as blocked if there's any writer changing any of the keys in // the read set. Do not increment the reader counts yet as we'll need to // use the original reader counts when scanning through the write set. - for key := range rset { + for _, key := range rset { if c.writerMap[key] > 0 { blocked = true break @@ -70,7 +70,7 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { // Mark as blocked if there's any writer or reader for any of the keys // in the write set. - for key := range wset { + for _, key := range wset { blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0 // Increment the writer count. @@ -78,7 +78,7 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { } // Finally we can increment the reader counts for keys in the read set. - for key := range rset { + for _, key := range rset { c.readerMap[key] += 1 } @@ -108,18 +108,18 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { } // Done decreases lock counts of the keys in the read/write sets. -func (c *commitQueue) Done(rset readSet, wset writeSet) { +func (c *commitQueue) Done(rset []string, wset []string) { c.mx.Lock() defer c.mx.Unlock() - for key := range rset { + for _, key := range rset { c.readerMap[key] -= 1 if c.readerMap[key] == 0 { delete(c.readerMap, key) } } - for key := range wset { + for _, key := range wset { c.writerMap[key] -= 1 if c.writerMap[key] == 0 { delete(c.writerMap, key) diff --git a/kvdb/etcd/commit_queue_test.go b/kvdb/etcd/commit_queue_test.go index 16ff71006db..25b226b57ab 100644 --- a/kvdb/etcd/commit_queue_test.go +++ b/kvdb/etcd/commit_queue_test.go @@ -39,28 +39,6 @@ func TestCommitQueue(t *testing.T) { } } - // Helper function to create a read set from the passed keys. - makeReadSet := func(keys []string) readSet { - rs := make(map[string]stmGet) - - for _, key := range keys { - rs[key] = stmGet{} - } - - return rs - } - - // Helper function to create a write set from the passed keys. - makeWriteSet := func(keys []string) writeSet { - ws := make(map[string]stmPut) - - for _, key := range keys { - ws[key] = stmPut{} - } - - return ws - } - ctx := context.Background() ctx, cancel := context.WithCancel(ctx) q := NewCommitQueue(ctx) @@ -73,26 +51,26 @@ func TestCommitQueue(t *testing.T) { // Tx1: reads: key1, key2, writes: key3, conflict: none q.Add( commit("free", true), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key3"}), + []string{"key1", "key2"}, + []string{"key3"}, ) // Tx2: reads: key1, key2, writes: key3, conflict: Tx1 q.Add( commit("blocked1", false), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key3"}), + []string{"key1", "key2"}, + []string{"key3"}, ) // Tx3: reads: key1, writes: key4, conflict: none q.Add( commit("free", true), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key4"}), + []string{"key1", "key2"}, + []string{"key4"}, ) // Tx4: reads: key2, writes: key4 conflict: Tx3 q.Add( commit("blocked2", false), - makeReadSet([]string{"key2"}), - makeWriteSet([]string{"key4"}), + []string{"key2"}, + []string{"key4"}, ) // Wait for all commits. diff --git a/kvdb/etcd/stm.go b/kvdb/etcd/stm.go index 0d47fe93791..28a2fc699a0 100644 --- a/kvdb/etcd/stm.go +++ b/kvdb/etcd/stm.go @@ -280,8 +280,30 @@ func runSTM(s *stm, apply func(STM) error) error { return preApplyErr } + // Make a copy of the read/write set keys here. The reason why we need + // to do this is because subsequent applies may change (shrink) these + // sets and so when we decrease reference counts in the commit queue in + // Done(...) we'd potentially miss removing references which would + // result in queueing up transactions and contending DB access. + // Copying these strings is cheap due to Go's immutable string which is + // always a reference. + rkeys := make([]string, len(s.rset)) + wkeys := make([]string, len(s.wset)) + + i := 0 + for key := range s.rset { + rkeys[i] = key + i++ + } + + i = 0 + for key := range s.wset { + wkeys[i] = key + i++ + } + // Queue up the transaction for execution. - s.txQueue.Add(execute, s.rset, s.wset) + s.txQueue.Add(execute, rkeys, wkeys) // Wait for the transaction to execute, or break if aborted. select { @@ -289,7 +311,7 @@ func runSTM(s *stm, apply func(STM) error) error { case <-s.options.ctx.Done(): } - s.txQueue.Done(s.rset, s.wset) + s.txQueue.Done(rkeys, wkeys) if s.options.commitStatsCallback != nil { stats.Retries = retries From b29ae94e103b04076a0511436cd06edc280e7445 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Thu, 24 Jun 2021 22:10:40 +0200 Subject: [PATCH 2/3] etcd: redesign commit queue to make it more robust and scalable This commit builds on the ideas of @cfromknecht in lnd/5153. The addition is that the design is now simpler and more robust by queueing up everything, but allowing maximal parallelism where txns don't block. Furthermore the commit makes CommitQueue.Done() private essentially removing the need to understand the queue externally. --- kvdb/etcd/commit_queue.go | 150 +++++++++++++++++++++++---------- kvdb/etcd/commit_queue_test.go | 22 +++-- kvdb/etcd/db.go | 11 ++- kvdb/etcd/stm.go | 5 +- kvdb/etcd/stm_test.go | 10 +-- 5 files changed, 136 insertions(+), 62 deletions(-) diff --git a/kvdb/etcd/commit_queue.go b/kvdb/etcd/commit_queue.go index 54f33c01d30..138c08e68dc 100644 --- a/kvdb/etcd/commit_queue.go +++ b/kvdb/etcd/commit_queue.go @@ -3,14 +3,11 @@ package etcd import ( + "container/list" "context" "sync" ) -// commitQueueSize is the maximum number of commits we let to queue up. All -// remaining commits will block on commitQueue.Add(). -const commitQueueSize = 100 - // commitQueue is a simple execution queue to manage conflicts for transactions // and thereby reduce the number of times conflicting transactions need to be // retried. When a new transaction is added to the queue, we first upgrade the @@ -25,9 +22,18 @@ type commitQueue struct { readerMap map[string]int writerMap map[string]int - commitMutex sync.RWMutex - queue chan (func()) - wg sync.WaitGroup + queue *list.List + queueMx sync.Mutex + queueCond *sync.Cond + + shutdown chan struct{} +} + +type commitQueueTxn struct { + commitLoop func() + blocked bool + rset []string + wset []string } // NewCommitQueue creates a new commit queue, with the passed abort context. @@ -36,19 +42,24 @@ func NewCommitQueue(ctx context.Context) *commitQueue { ctx: ctx, readerMap: make(map[string]int), writerMap: make(map[string]int), - queue: make(chan func(), commitQueueSize), + queue: list.New(), + shutdown: make(chan struct{}), } + q.queueCond = sync.NewCond(&q.queueMx) // Start the queue consumer loop. - q.wg.Add(1) go q.mainLoop() return q } -// Wait waits for the queue to stop (after the queue context has been canceled). -func (c *commitQueue) Wait() { - c.wg.Wait() +// Stop signals the queue to stop after the queue context has been canceled and +// waits until the has stopped. +func (c *commitQueue) Stop() { + // Signal the queue's condition variable to ensure the mainLoop reliably + // unblocks to check for the exit condition. + c.queueCond.Signal() + <-c.shutdown } // Add increases lock counts and queues up tx commit closure for execution. @@ -82,33 +93,22 @@ func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) { c.readerMap[key] += 1 } - if blocked { - // Add the transaction to the queue if conflicts with an already - // queued one. - c.mx.Unlock() + c.queueCond.L.Lock() + c.queue.PushBack(&commitQueueTxn{ + commitLoop: commitLoop, + blocked: blocked, + rset: rset, + wset: wset, + }) + c.queueCond.L.Unlock() - select { - case c.queue <- commitLoop: - case <-c.ctx.Done(): - } - } else { - // To make sure we don't add a new tx to the queue that depends - // on this "unblocked" tx, grab the commitMutex before lifting - // the mutex guarding the lock maps. - c.commitMutex.RLock() - c.mx.Unlock() - - // At this point we're safe to execute the "unblocked" tx, as - // we cannot execute blocked tx that may have been read from the - // queue until the commitMutex is held. - commitLoop() - - c.commitMutex.RUnlock() - } + c.mx.Unlock() + + c.queueCond.Signal() } -// Done decreases lock counts of the keys in the read/write sets. -func (c *commitQueue) Done(rset []string, wset []string) { +// done decreases lock counts of the keys in the read/write sets. +func (c *commitQueue) done(rset []string, wset []string) { c.mx.Lock() defer c.mx.Unlock() @@ -131,20 +131,82 @@ func (c *commitQueue) Done(rset []string, wset []string) { // dependencies. The queue ensures that the top element doesn't conflict with // any other transactions and therefore can be executed freely. func (c *commitQueue) mainLoop() { - defer c.wg.Done() + defer close(c.shutdown) for { + // Wait until there are no unblocked transactions being + // executed, and for there to be at least one blocked + // transaction in our queue. + c.queueCond.L.Lock() + for c.queue.Front() == nil { + c.queueCond.Wait() + + // Check the exit condition before looping again. + select { + case <-c.ctx.Done(): + c.queueCond.L.Unlock() + return + default: + } + } + + // Now collect all txns until we find the next blocking one. + // These shouldn't conflict (if the precollected read/write + // keys sets don't grow), meaning we can safely commit them + // in parallel. + work := make([]*commitQueueTxn, 1) + e := c.queue.Front() + work[0] = c.queue.Remove(e).(*commitQueueTxn) + + for { + e := c.queue.Front() + if e == nil { + break + } + + next := e.Value.(*commitQueueTxn) + if !next.blocked { + work = append(work, next) + c.queue.Remove(e) + } else { + // We found the next blocking txn which means + // the block of work needs to be cut here. + break + } + } + + c.queueCond.L.Unlock() + + // Check if we need to exit before continuing. select { - case top := <-c.queue: - // Execute the next blocked transaction. As it is - // the top element in the queue it means that it doesn't - // depend on any other transactions anymore. - c.commitMutex.Lock() - top() - c.commitMutex.Unlock() + case <-c.ctx.Done(): + return + default: + } + + var wg sync.WaitGroup + wg.Add(len(work)) + // Fire up N goroutines where each will run its commit loop + // and then clean up the reader/writer maps. + for _, txn := range work { + go func(txn *commitQueueTxn) { + defer wg.Done() + txn.commitLoop() + + // We can safely cleanup here as done only + // holds the main mutex. + c.done(txn.rset, txn.wset) + }(txn) + } + + wg.Wait() + + // Check if we need to exit before continuing. + select { case <-c.ctx.Done(): return + default: } } } diff --git a/kvdb/etcd/commit_queue_test.go b/kvdb/etcd/commit_queue_test.go index 25b226b57ab..a7ebcca2b36 100644 --- a/kvdb/etcd/commit_queue_test.go +++ b/kvdb/etcd/commit_queue_test.go @@ -17,7 +17,7 @@ import ( func TestCommitQueue(t *testing.T) { // The duration of each commit. const commitDuration = time.Millisecond * 500 - const numCommits = 4 + const numCommits = 5 var wg sync.WaitGroup commits := make([]string, numCommits) @@ -30,25 +30,25 @@ func TestCommitQueue(t *testing.T) { // Update our log of commit order. Avoid blocking // by preallocating the commit log and increasing // the log index atomically. - i := atomic.AddInt32(&idx, 1) - commits[i] = tag - if sleep { time.Sleep(commitDuration) } + + i := atomic.AddInt32(&idx, 1) + commits[i] = tag } } ctx := context.Background() ctx, cancel := context.WithCancel(ctx) q := NewCommitQueue(ctx) - defer q.Wait() + defer q.Stop() defer cancel() wg.Add(numCommits) t1 := time.Now() - // Tx1: reads: key1, key2, writes: key3, conflict: none + // Tx1 (long): reads: key1, key2, writes: key3, conflict: none q.Add( commit("free", true), []string{"key1", "key2"}, @@ -60,12 +60,18 @@ func TestCommitQueue(t *testing.T) { []string{"key1", "key2"}, []string{"key3"}, ) - // Tx3: reads: key1, writes: key4, conflict: none + // Tx3 (long): reads: key1, writes: key4, conflict: none q.Add( commit("free", true), []string{"key1", "key2"}, []string{"key4"}, ) + // Tx4 (long): reads: key1, writes: none, conflict: none + q.Add( + commit("free", true), + []string{"key1", "key2"}, + []string{}, + ) // Tx4: reads: key2, writes: key4 conflict: Tx3 q.Add( commit("blocked2", false), @@ -87,7 +93,7 @@ func TestCommitQueue(t *testing.T) { // before the blocking ones, and the blocking ones are executed in // the order of addition. require.Equal(t, - []string{"free", "free", "blocked1", "blocked2"}, + []string{"free", "blocked1", "free", "free", "blocked2"}, commits, ) } diff --git a/kvdb/etcd/db.go b/kvdb/etcd/db.go index e3bad3b250d..12b8986884f 100644 --- a/kvdb/etcd/db.go +++ b/kvdb/etcd/db.go @@ -122,6 +122,7 @@ func (c *commitStatsCollector) callback(succ bool, stats CommitStats) { type db struct { cfg Config ctx context.Context + cancel func() cli *clientv3.Client commitStatsCollector *commitStatsCollector txQueue *commitQueue @@ -135,7 +136,6 @@ var _ walletdb.DB = (*db)(nil) // config. If etcd connection cannot be established, then returns error. func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { clientCfg := clientv3.Config{ - Context: ctx, Endpoints: []string{cfg.Host}, DialTimeout: etcdConnectionTimeout, Username: cfg.User, @@ -158,8 +158,11 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { clientCfg.TLS = tlsConfig } + ctx, cancel := context.WithCancel(ctx) + clientCfg.Context = ctx cli, err := clientv3.New(clientCfg) if err != nil { + cancel() return nil, err } @@ -171,6 +174,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { backend := &db{ cfg: cfg, ctx: ctx, + cancel: cancel, cli: cli, txQueue: NewCommitQueue(ctx), } @@ -296,5 +300,8 @@ func (db *db) Copy(w io.Writer) error { // Close cleanly shuts down the database and syncs all data. // This function is part of the walletdb.Db interface implementation. func (db *db) Close() error { - return db.cli.Close() + err := db.cli.Close() + db.cancel() + db.txQueue.Stop() + return err } diff --git a/kvdb/etcd/stm.go b/kvdb/etcd/stm.go index 28a2fc699a0..9d1e50e97a1 100644 --- a/kvdb/etcd/stm.go +++ b/kvdb/etcd/stm.go @@ -283,7 +283,7 @@ func runSTM(s *stm, apply func(STM) error) error { // Make a copy of the read/write set keys here. The reason why we need // to do this is because subsequent applies may change (shrink) these // sets and so when we decrease reference counts in the commit queue in - // Done(...) we'd potentially miss removing references which would + // done(...) we'd potentially miss removing references which would // result in queueing up transactions and contending DB access. // Copying these strings is cheap due to Go's immutable string which is // always a reference. @@ -309,10 +309,9 @@ func runSTM(s *stm, apply func(STM) error) error { select { case <-done: case <-s.options.ctx.Done(): + return context.Canceled } - s.txQueue.Done(rkeys, wkeys) - if s.options.commitStatsCallback != nil { stats.Retries = retries s.options.commitStatsCallback(executeErr == nil, stats) diff --git a/kvdb/etcd/stm_test.go b/kvdb/etcd/stm_test.go index a780f86269c..e4a3810cd94 100644 --- a/kvdb/etcd/stm_test.go +++ b/kvdb/etcd/stm_test.go @@ -28,7 +28,7 @@ func TestPutToEmpty(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) @@ -55,7 +55,7 @@ func TestGetPutDel(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() testKeyValues := []KV{ @@ -141,7 +141,7 @@ func TestFirstLastNextPrev(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() testKeyValues := []KV{ @@ -299,7 +299,7 @@ func TestCommitError(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) @@ -347,7 +347,7 @@ func TestManualTxError(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) From e243be1ba2ef538918994381ebce06701609e9f8 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 23 Jul 2021 15:49:06 +0200 Subject: [PATCH 3/3] docs: update 0.14.0 release notes --- docs/release-notes/release-notes-0.14.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 6c45edf8130..a8e2d26b269 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -107,6 +107,10 @@ you. * [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/) to make LNDs payment throughput (and latency) with better when using etcd. +* [More robust commit queue design](https://github.com/lightningnetwork/lnd/pull/5513) + to make it less likely that we retry etcd transactions and make the commit + queue more scalable. + ## Performance improvements * [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515) @@ -119,6 +123,7 @@ currnet DNS seeds when in SigNet mode](https://github.com/lightningnetwork/lnd/pull/5564). # Contributors (Alphabetical Order) +* Andras Banki-Horvath * ErikEk * Martin Habovstiak * Zero-1729