-
Notifications
You must be signed in to change notification settings - Fork 2.2k
etcd: make commit queue test deterministic #5117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,24 +7,25 @@ import ( | |
| "sync" | ||
| "sync/atomic" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // TestCommitQueue tests that non-conflicting transactions commit concurrently, | ||
| // while conflicting transactions are queued up. | ||
| func TestCommitQueue(t *testing.T) { | ||
| // The duration of each commit. | ||
| const commitDuration = time.Millisecond * 500 | ||
| const numCommits = 4 | ||
|
|
||
| const numCommits = 3 | ||
| var wg sync.WaitGroup | ||
| commits := make([]string, numCommits) | ||
| idx := int32(-1) | ||
|
|
||
| commit := func(tag string, sleep bool) func() { | ||
| commit := func(tag string, commit chan struct{}, | ||
| block *sync.Mutex) func() { | ||
|
|
||
| return func() { | ||
| if commit != nil { | ||
| close(commit) | ||
| } | ||
| defer wg.Done() | ||
|
|
||
| // Update our log of commit order. Avoid blocking | ||
|
|
@@ -33,8 +34,8 @@ func TestCommitQueue(t *testing.T) { | |
| i := atomic.AddInt32(&idx, 1) | ||
| commits[i] = tag | ||
|
|
||
| if sleep { | ||
| time.Sleep(commitDuration) | ||
| if block != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs a comment, same for the channel close above |
||
| block.Lock() | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -68,48 +69,51 @@ func TestCommitQueue(t *testing.T) { | |
| defer cancel() | ||
|
|
||
| wg.Add(numCommits) | ||
| t1 := time.Now() | ||
|
|
||
| var block sync.Mutex | ||
|
|
||
| // Tx1: reads: key1, key2, writes: key3, conflict: none | ||
| q.Add( | ||
| commit("free", true), | ||
| // Since we simulate that the txn takes a long time, we'll add in a | ||
| // new goroutine and wait for the txn commit to start execution. | ||
| commitTx1 := make(chan struct{}) | ||
| // Block the commit by holding the mutex. | ||
| block.Lock() | ||
| go q.Add( | ||
| commit("free", commitTx1, &block), | ||
| makeReadSet([]string{"key1", "key2"}), | ||
| makeWriteSet([]string{"key3"}), | ||
| ) | ||
| // Tx2: reads: key1, key2, writes: key3, conflict: Tx1 | ||
| q.Add( | ||
| commit("blocked1", false), | ||
| makeReadSet([]string{"key1", "key2"}), | ||
| makeWriteSet([]string{"key3"}), | ||
| ) | ||
| // Tx3: reads: key1, writes: key4, conflict: none | ||
| <-commitTx1 | ||
|
|
||
| // Tx2: reads: key1, key5, writes: key3, conflict: Tx1 (key3) | ||
| // We don't expect queue add to block as this txn will queue up after | ||
| // tx1. | ||
| q.Add( | ||
| commit("free", true), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test seems a bit weaker now, in that it doesn't actually test for transaction reordering. If the commit queue simply executed the transactions in the order they are added, the test would pass. The new version doesn't test that concurrent non-conflicting transactions are executed immediately. I think it makes sense to keep the current transactions, possibly even making the test stronger by naming the transactions
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without the sleeps it's not really possible to do this deterministically without submitting non-conflicting txns first since then the non-conflicting txn that should be executed inside the commit queue's read mutex will be behind the conflicting txn which waits on that mutex as a writer. See a simplified example: https://play.golang.org/p/66zi9waGZj- |
||
| makeReadSet([]string{"key1", "key2"}), | ||
| makeWriteSet([]string{"key4"}), | ||
| commit("blocked1", nil, nil), | ||
| makeReadSet([]string{"key1"}), | ||
| makeWriteSet([]string{"key3", "key5"}), | ||
| ) | ||
| // Tx4: reads: key2, writes: key4 conflict: Tx3 | ||
|
|
||
| // Tx3: reads: key2, key5, writes: key4 conflict: Tx2 (key5) | ||
| // We don't expect queue add to block as this txn will queue up after | ||
| // tx2. | ||
| q.Add( | ||
| commit("blocked2", false), | ||
| makeReadSet([]string{"key2"}), | ||
| commit("blocked2", nil, nil), | ||
| makeReadSet([]string{"key2", "key5"}), | ||
| makeWriteSet([]string{"key4"}), | ||
| ) | ||
|
|
||
| // Allow Tx1 to continue with the commit. | ||
| block.Unlock() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: might be simpler to use a commit := func(...) {
...
if ready != nil {
<-ready
}
}
...
ready := make(chan struct{})
...
commit(_, _, ready)
...
close(ready)This approach will also work with more than one "free" transaction
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment. We can use a channel, but in the test example it's not possible to submit overlapping conflicting/non-conflicting txns without introducing a deadlock in the test (even if we resolve it later).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that using a channel to signal that the commit can continue is better than a mutex |
||
|
|
||
| // Wait for all commits. | ||
| wg.Wait() | ||
| t2 := time.Now() | ||
|
|
||
| // Expected total execution time: delta. | ||
| // 2 * commitDuration <= delta < 3 * commitDuration | ||
| delta := t2.Sub(t1) | ||
| require.LessOrEqual(t, int64(commitDuration*2), int64(delta)) | ||
| require.Greater(t, int64(commitDuration*3), int64(delta)) | ||
|
|
||
| // Expect that the non-conflicting "free" transactions are executed | ||
| // before the blocking ones, and the blocking ones are executed in | ||
| // before the conflicting ones, and the conflicting ones are executed in | ||
| // the order of addition. | ||
| require.Equal(t, | ||
| []string{"free", "free", "blocked1", "blocked2"}, | ||
| []string{"free", "blocked1", "blocked2"}, | ||
| commits, | ||
| ) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can names this
signalStartor something other that is more descriptive thancommit