diff --git a/channeldb/kvdb/etcd/commit_queue_test.go b/channeldb/kvdb/etcd/commit_queue_test.go index 16ff71006db..75505854e26 100644 --- a/channeldb/kvdb/etcd/commit_queue_test.go +++ b/channeldb/kvdb/etcd/commit_queue_test.go @@ -7,7 +7,6 @@ import ( "sync" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/require" ) @@ -15,16 +14,18 @@ import ( // TestCommitQueue tests that non-conflicting transactions commit concurrently, // while conflicting transactions are queued up. func TestCommitQueue(t *testing.T) { - // The duration of each commit. - const commitDuration = time.Millisecond * 500 - const numCommits = 4 - + const numCommits = 3 var wg sync.WaitGroup commits := make([]string, numCommits) idx := int32(-1) - commit := func(tag string, sleep bool) func() { + commit := func(tag string, commit chan struct{}, + block *sync.Mutex) func() { + return func() { + if commit != nil { + close(commit) + } defer wg.Done() // Update our log of commit order. Avoid blocking @@ -33,8 +34,8 @@ func TestCommitQueue(t *testing.T) { i := atomic.AddInt32(&idx, 1) commits[i] = tag - if sleep { - time.Sleep(commitDuration) + if block != nil { + block.Lock() } } } @@ -68,48 +69,51 @@ func TestCommitQueue(t *testing.T) { defer cancel() wg.Add(numCommits) - t1 := time.Now() + + var block sync.Mutex // Tx1: reads: key1, key2, writes: key3, conflict: none - q.Add( - commit("free", true), + // Since we simulate that the txn takes a long time, we'll add in a + // new goroutine and wait for the txn commit to start execution. + commitTx1 := make(chan struct{}) + // Block the commit by holding the mutex. + block.Lock() + go q.Add( + commit("free", commitTx1, &block), makeReadSet([]string{"key1", "key2"}), makeWriteSet([]string{"key3"}), ) - // Tx2: reads: key1, key2, writes: key3, conflict: Tx1 - q.Add( - commit("blocked1", false), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key3"}), - ) - // Tx3: reads: key1, writes: key4, conflict: none + <-commitTx1 + + // Tx2: reads: key1, key5, writes: key3, conflict: Tx1 (key3) + // We don't expect queue add to block as this txn will queue up after + // tx1. q.Add( - commit("free", true), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key4"}), + commit("blocked1", nil, nil), + makeReadSet([]string{"key1"}), + makeWriteSet([]string{"key3", "key5"}), ) - // Tx4: reads: key2, writes: key4 conflict: Tx3 + + // Tx3: reads: key2, key5, writes: key4 conflict: Tx2 (key5) + // We don't expect queue add to block as this txn will queue up after + // tx2. q.Add( - commit("blocked2", false), - makeReadSet([]string{"key2"}), + commit("blocked2", nil, nil), + makeReadSet([]string{"key2", "key5"}), makeWriteSet([]string{"key4"}), ) + // Allow Tx1 to continue with the commit. + block.Unlock() + // Wait for all commits. wg.Wait() - t2 := time.Now() - - // Expected total execution time: delta. - // 2 * commitDuration <= delta < 3 * commitDuration - delta := t2.Sub(t1) - require.LessOrEqual(t, int64(commitDuration*2), int64(delta)) - require.Greater(t, int64(commitDuration*3), int64(delta)) // Expect that the non-conflicting "free" transactions are executed - // before the blocking ones, and the blocking ones are executed in + // before the conflicting ones, and the conflicting ones are executed in // the order of addition. require.Equal(t, - []string{"free", "free", "blocked1", "blocked2"}, + []string{"free", "blocked1", "blocked2"}, commits, ) }