Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions channeldb/kvdb/etcd/commit_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestCommitQueue tests that non-conflicting transactions commit concurrently,
// while conflicting transactions are queued up.
func TestCommitQueue(t *testing.T) {
// The duration of each commit.
const commitDuration = time.Millisecond * 500
const numCommits = 4

const numCommits = 3
var wg sync.WaitGroup
commits := make([]string, numCommits)
idx := int32(-1)

commit := func(tag string, sleep bool) func() {
commit := func(tag string, commit chan struct{},
block *sync.Mutex) func() {

return func() {
if commit != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can names this signalStart or something other that is more descriptive than commit

close(commit)
}
defer wg.Done()

// Update our log of commit order. Avoid blocking
Expand All @@ -33,8 +34,8 @@ func TestCommitQueue(t *testing.T) {
i := atomic.AddInt32(&idx, 1)
commits[i] = tag

if sleep {
time.Sleep(commitDuration)
if block != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs a comment, same for the channel close above

block.Lock()
}
}
}
Expand Down Expand Up @@ -68,48 +69,51 @@ func TestCommitQueue(t *testing.T) {
defer cancel()

wg.Add(numCommits)
t1 := time.Now()

var block sync.Mutex

// Tx1: reads: key1, key2, writes: key3, conflict: none
q.Add(
commit("free", true),
// Since we simulate that the txn takes a long time, we'll add in a
// new goroutine and wait for the txn commit to start execution.
commitTx1 := make(chan struct{})
// Block the commit by holding the mutex.
block.Lock()
go q.Add(
commit("free", commitTx1, &block),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
)
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1
q.Add(
commit("blocked1", false),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
)
// Tx3: reads: key1, writes: key4, conflict: none
<-commitTx1

// Tx2: reads: key1, key5, writes: key3, conflict: Tx1 (key3)
// We don't expect queue add to block as this txn will queue up after
// tx1.
q.Add(
commit("free", true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems a bit weaker now, in that it doesn't actually test for transaction reordering. If the commit queue simply executed the transactions in the order they are added, the test would pass. The new version doesn't test that concurrent non-conflicting transactions are executed immediately.

I think it makes sense to keep the current transactions, possibly even making the test stronger by naming the transactions free1 and free2 to assert their order.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the sleeps it's not really possible to do this deterministically without submitting non-conflicting txns first since then the non-conflicting txn that should be executed inside the commit queue's read mutex will be behind the conflicting txn which waits on that mutex as a writer. See a simplified example: https://play.golang.org/p/66zi9waGZj-

makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key4"}),
commit("blocked1", nil, nil),
makeReadSet([]string{"key1"}),
makeWriteSet([]string{"key3", "key5"}),
)
// Tx4: reads: key2, writes: key4 conflict: Tx3

// Tx3: reads: key2, key5, writes: key4 conflict: Tx2 (key5)
// We don't expect queue add to block as this txn will queue up after
// tx2.
q.Add(
commit("blocked2", false),
makeReadSet([]string{"key2"}),
commit("blocked2", nil, nil),
makeReadSet([]string{"key2", "key5"}),
makeWriteSet([]string{"key4"}),
)

// Allow Tx1 to continue with the commit.
block.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be simpler to use a ready channel, e.g.:

commit := func(...) {
        ...
        if ready != nil {
              <-ready
        }
}
...
ready := make(chan struct{})
...
commit(_, _, ready)
...
close(ready)

This approach will also work with more than one "free" transaction

Copy link
Collaborator Author

@bhandras bhandras Mar 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment. We can use a channel, but in the test example it's not possible to submit overlapping conflicting/non-conflicting txns without introducing a deadlock in the test (even if we resolve it later).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using a channel to signal that the commit can continue is better than a mutex


// Wait for all commits.
wg.Wait()
t2 := time.Now()

// Expected total execution time: delta.
// 2 * commitDuration <= delta < 3 * commitDuration
delta := t2.Sub(t1)
require.LessOrEqual(t, int64(commitDuration*2), int64(delta))
require.Greater(t, int64(commitDuration*3), int64(delta))

// Expect that the non-conflicting "free" transactions are executed
// before the blocking ones, and the blocking ones are executed in
// before the conflicting ones, and the conflicting ones are executed in
// the order of addition.
require.Equal(t,
[]string{"free", "free", "blocked1", "blocked2"},
[]string{"free", "blocked1", "blocked2"},
commits,
)
}