From 3086cb93f65ad3b02a6f9cf81d3977fc5b59f2e4 Mon Sep 17 00:00:00 2001 From: tok-kkk Date: Fri, 3 Sep 2021 10:27:29 +1000 Subject: [PATCH 1/6] allow the replica to reset signatories --- process/process.go | 5 +++++ replica/replica.go | 25 ++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/process/process.go b/process/process.go index b448de4..9de344a 100644 --- a/process/process.go +++ b/process/process.go @@ -8,6 +8,7 @@ package process import ( "fmt" + "sync/atomic" "github.com/renproject/id" "github.com/renproject/surge" @@ -343,6 +344,10 @@ func (p *Process) StartRound(round Round) { } } +func (p *Process) ResetF(f uint64) { + atomic.StoreUint64(&p.f, f) +} + // OnTimeoutPropose is used to notify the Process that a timeout has been // activated. It must only be called after the TimeoutPropose method in the // Timer has been called. diff --git a/replica/replica.go b/replica/replica.go index ab3a5c1..5c25ac4 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -141,6 +141,17 @@ func (replica *Replica) Run(ctx context.Context) { case process.Height: replica.proc.State = process.DefaultState().WithCurrentHeight(m) replica.mq.DropMessagesBelowHeight(m) + case []id.Signatory: + procAllowed := map[id.Signatory]bool{} + for _, sig := range m{ + procAllowed[sig] = true + } + oldF := len(replica.procsAllowed) / 3 + newF := len(procAllowed) + if newF != oldF{ + replica.proc.ResetF(uint64(newF)) + } + replica.procsAllowed = procAllowed } } @@ -212,7 +223,7 @@ func (replica *Replica) TimeoutPrecommit(ctx context.Context, timeout timer.Time } } -// ResetHeight of the underlying process to a future height. This is should only +// ResetHeight of the underlying process to a future height. This should only // be used when resynchronising the chain. If the given height is less than or // equal to the current height, nothing happens. // @@ -228,6 +239,18 @@ func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Heigh } } +// ResetSignatories of the replica and underlying process with a new set of signatories. +func (replica *Replica) ResetSignatories(ctx context.Context, signatories []id.Signatory) { + if len(signatories) == 0 { + return + } + + select { + case <-ctx.Done(): + case replica.mch <- signatories: + } +} + // State returns the current height, round and step of the underlying process. func (replica Replica) State() (process.Height, process.Round, process.Step) { return replica.proc.CurrentHeight, replica.proc.CurrentRound, replica.proc.CurrentStep From b0abc45b73a6cfd2d14a651cf7ac86bace59d6f5 Mon Sep 17 00:00:00 2001 From: tok-kkk Date: Fri, 3 Sep 2021 10:42:05 +1000 Subject: [PATCH 2/6] reset the scheduler when resetting signatories --- process/process.go | 3 ++- replica/replica.go | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/process/process.go b/process/process.go index 9de344a..7c0630f 100644 --- a/process/process.go +++ b/process/process.go @@ -344,8 +344,9 @@ func (p *Process) StartRound(round Round) { } } -func (p *Process) ResetF(f uint64) { +func (p *Process) ResetF(f uint64, scheduler Scheduler) { atomic.StoreUint64(&p.f, f) + p.scheduler = scheduler } // OnTimeoutPropose is used to notify the Process that a timeout has been diff --git a/replica/replica.go b/replica/replica.go index 5c25ac4..f27f66e 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -146,12 +146,11 @@ func (replica *Replica) Run(ctx context.Context) { for _, sig := range m{ procAllowed[sig] = true } - oldF := len(replica.procsAllowed) / 3 - newF := len(procAllowed) - if newF != oldF{ - replica.proc.ResetF(uint64(newF)) - } replica.procsAllowed = procAllowed + + scheduler := scheduler.NewRoundRobin(m) + f := len(procAllowed) /3 + replica.proc.ResetF(uint64(f), scheduler) } } From f7b55b8e627f406f1c476b417290ca9a41caae67 Mon Sep 17 00:00:00 2001 From: tok-kkk Date: Fri, 3 Sep 2021 18:04:46 +1000 Subject: [PATCH 3/6] combine the ResetSignatories and ResetHeight function --- mq/mq.go | 5 ++++- mq/mq_test.go | 37 ++++++++++++++++++++++++++----------- replica/replica.go | 21 +++------------------ 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/mq/mq.go b/mq/mq.go index 9b0a970..636408b 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -33,8 +33,11 @@ func New(opts Options) MessageQueue { // have heights up to (and including) the given height. The appropriate callback // will be called for every message that is consumed. All consumed messages will // be dropped from the MessageQueue. -func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit)) (n int) { +func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) { for from, q := range mq.queuesByPid { + if ok := procsAllowed[from] ; !ok{ + mq.queuesByPid[from] = nil + } for len(q) > 0 { if q[0] == nil || height(q[0]) > h { break diff --git a/mq/mq_test.go b/mq/mq_test.go index 4fdd9e5..73fd571 100644 --- a/mq/mq_test.go +++ b/mq/mq_test.go @@ -108,6 +108,7 @@ var _ = Describe("MQ", func() { proposeCallback, prevoteCallback, precommitCallback, + map[id.Signatory]bool{}, ) Expect(n).To(Equal(0)) @@ -124,6 +125,8 @@ var _ = Describe("MQ", func() { sender := id.NewPrivKey().Signatory() lowerHeight := process.Height(r.Int63()) higherHeight := lowerHeight + 1 + process.Height(r.Intn(100)) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true // send msg1 msg1 := randomMsg(r, sender, lowerHeight, processutil.RandomRound(r)) @@ -199,12 +202,12 @@ var _ = Describe("MQ", func() { // cannot consume msgs of height less than lowerHeight evenLowerHeight := lowerHeight - 1 - process.Height(r.Intn(100)) - n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(i).To(Equal(0)) // consume all messages - n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(2)) Expect(i).To(Equal(2)) @@ -222,6 +225,9 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() height := process.Height(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true + // at the most 20 rounds rounds := make([]process.Round, 1+r.Intn(20)) for t := 0; t < cap(rounds); t++ { @@ -280,12 +286,12 @@ var _ = Describe("MQ", func() { // cannot consume msgs of height less than lowerHeight lowerHeight := height - 1 - process.Height(r.Intn(100)) - n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(t).To(Equal(0)) // consume all messages - n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(cap(rounds))) Expect(t).To(Equal(cap(rounds))) @@ -303,6 +309,8 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true // we should first consume msg1 and then msg2 prevHeight := process.Height(-1) @@ -369,12 +377,12 @@ var _ = Describe("MQ", func() { } // cannot consume msgs of height less than the min height - n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(i).To(Equal(0)) // consume all messages - n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(msgsCount)) Expect(i).To(Equal(msgsCount)) @@ -392,6 +400,8 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true _, maxHeight, _ := insertRandomMessages(&queue, sender) thresholdHeight := process.Height(r.Intn(int(maxHeight))) queue.DropMessagesBelowHeight(thresholdHeight) @@ -406,7 +416,7 @@ var _ = Describe("MQ", func() { Expect(precommit.Height >= thresholdHeight).To(BeTrue()) } - _ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback) + _ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) return true } Expect(quick.Check(loop, nil)).To(Succeed()) @@ -418,6 +428,7 @@ var _ = Describe("MQ", func() { loop := func() bool { opts := mq.DefaultOptions().WithMaxCapacity(1) queue := mq.New(opts) + procsAllowed := map[id.Signatory]bool{} // insert a msg originalSender := id.NewPrivKey().Signatory() @@ -425,6 +436,7 @@ var _ = Describe("MQ", func() { originalMsg.From = originalSender originalMsg.Height = process.Height(1) originalMsg.Round = process.Round(1) + procsAllowed[originalMsg.From] = true queue.InsertPropose(originalMsg) // any message in height > 1 or (height = 1 || round > 1) will be dropped @@ -434,11 +446,12 @@ var _ = Describe("MQ", func() { msg.From = id.NewPrivKey().Signatory() msg.Height = process.Height(1) msg.Round = process.Round(2) + procsAllowed[msg.From] = true queue.InsertPropose(msg) // so consuming will only return the first msg proposeCallback := func(propose process.Propose) {} - n := queue.Consume(process.Height(1), proposeCallback, nil, nil) + n := queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(2)) // re-insert the original msg @@ -458,7 +471,7 @@ var _ = Describe("MQ", func() { Expect(propose.Round).To(Equal(originalMsg.Round)) Expect(propose.From).To(Equal(originalSender)) } - n = queue.Consume(process.Height(1), proposeCallback, nil, nil) + n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(1)) // re-insert the original msg @@ -477,7 +490,7 @@ var _ = Describe("MQ", func() { Expect(propose.Round).To(Equal(msg.Round)) Expect(propose.From).To(Equal(originalSender)) } - n = queue.Consume(process.Height(1), proposeCallback, nil, nil) + n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(1)) return true @@ -497,6 +510,8 @@ var _ = Describe("MQ", func() { // msgsCount > c sender := id.NewPrivKey().Signatory() height := process.Height(1) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true msgsCount := c + 5 + r.Intn(20) rounds := make([]process.Round, msgsCount) msgs := make([]interface{}, msgsCount) @@ -553,7 +568,7 @@ var _ = Describe("MQ", func() { i++ } - n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(c)) Expect(i).To(Equal(c)) diff --git a/replica/replica.go b/replica/replica.go index f27f66e..e252623 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -118,25 +118,16 @@ func (replica *Replica) Run(ctx context.Context) { if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPropose(m) case process.Prevote: if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPrevote(m) case process.Precommit: if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPrecommit(m) case process.Height: replica.proc.State = process.DefaultState().WithCurrentHeight(m) @@ -228,7 +219,7 @@ func (replica *Replica) TimeoutPrecommit(ctx context.Context, timeout timer.Time // // NOTE: All messages that are currently in the message queue for heights less // than the given height will be dropped. -func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height) { +func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height, signatories []id.Signatory) { if newHeight <= replica.proc.State.CurrentHeight { return } @@ -236,11 +227,8 @@ func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Heigh case <-ctx.Done(): case replica.mch <- newHeight: } -} -// ResetSignatories of the replica and underlying process with a new set of signatories. -func (replica *Replica) ResetSignatories(ctx context.Context, signatories []id.Signatory) { - if len(signatories) == 0 { + if len(signatories) == 0 { return } @@ -264,10 +252,6 @@ func (replica *Replica) filterHeight(height process.Height) bool { return height >= replica.proc.CurrentHeight } -func (replica *Replica) filterFrom(from id.Signatory) bool { - return replica.procsAllowed[from] -} - func (replica *Replica) flush() { for { n := replica.mq.Consume( @@ -275,6 +259,7 @@ func (replica *Replica) flush() { replica.proc.Propose, replica.proc.Prevote, replica.proc.Precommit, + replica.procsAllowed, ) if n == 0 { return From b41deb00c26e490be7b7cdb737f79c1f4c494e8c Mon Sep 17 00:00:00 2001 From: tok-kkk Date: Wed, 8 Sep 2021 15:11:52 +1000 Subject: [PATCH 4/6] not filter sender address for future blocks --- mq/mq.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mq/mq.go b/mq/mq.go index 636408b..f57d566 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -35,13 +35,15 @@ func New(opts Options) MessageQueue { // be dropped from the MessageQueue. func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) { for from, q := range mq.queuesByPid { - if ok := procsAllowed[from] ; !ok{ - mq.queuesByPid[from] = nil - } for len(q) > 0 { if q[0] == nil || height(q[0]) > h { break } + if ok := procsAllowed[from] ; !ok{ + n++ + q = q[1:] + continue + } switch msg := q[0].(type) { case process.Propose: propose(msg) From 020b982a2ff6e90483f39ace277d4bdcdf621ade Mon Sep 17 00:00:00 2001 From: tok-kkk Date: Wed, 8 Sep 2021 17:15:45 +1000 Subject: [PATCH 5/6] defer the index increament when consuming messages --- mq/mq.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/mq/mq.go b/mq/mq.go index f57d566..7ccf48f 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -39,21 +39,25 @@ func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), if q[0] == nil || height(q[0]) > h { break } - if ok := procsAllowed[from] ; !ok{ - n++ - q = q[1:] - continue - } - switch msg := q[0].(type) { - case process.Propose: - propose(msg) - case process.Prevote: - prevote(msg) - case process.Precommit: - precommit(msg) - } - n++ - q = q[1:] + func() { + defer func() { + n++ + q = q[1:] + }() + + if ok := procsAllowed[from]; !ok { + return + } + + switch msg := q[0].(type) { + case process.Propose: + propose(msg) + case process.Prevote: + prevote(msg) + case process.Precommit: + precommit(msg) + } + }() } mq.queuesByPid[from] = q } From 9230a2c350956e98c1ec63f5a49c58dbc83bf5f8 Mon Sep 17 00:00:00 2001 From: tok-kkk Date: Wed, 8 Sep 2021 17:16:01 +1000 Subject: [PATCH 6/6] gofmt --- replica/replica.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replica/replica.go b/replica/replica.go index e252623..4513576 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -134,13 +134,13 @@ func (replica *Replica) Run(ctx context.Context) { replica.mq.DropMessagesBelowHeight(m) case []id.Signatory: procAllowed := map[id.Signatory]bool{} - for _, sig := range m{ + for _, sig := range m { procAllowed[sig] = true } replica.procsAllowed = procAllowed scheduler := scheduler.NewRoundRobin(m) - f := len(procAllowed) /3 + f := len(procAllowed) / 3 replica.proc.ResetF(uint64(f), scheduler) } }