diff --git a/internal/bft/controller.go b/internal/bft/controller.go index 9ced3bb4..df9b73bb 100644 --- a/internal/bft/controller.go +++ b/internal/bft/controller.go @@ -72,6 +72,7 @@ type Proposer interface { Start() Abort() Stopped() bool + AbortChan() <-chan struct{} GetLeaderID() uint64 GetMetadata() []byte HandleMessage(sender uint64, m *protos.Message) @@ -171,6 +172,14 @@ func (c *Controller) currentViewStopped() bool { return view.Stopped() } +func (c *Controller) currentViewAbortChan() <-chan struct{} { + c.currViewLock.RLock() + view := c.currView + c.currViewLock.RUnlock() + + return view.AbortChan() +} + func (c *Controller) currentViewLeader() uint64 { c.currViewLock.RLock() view := c.currView @@ -785,7 +794,7 @@ func (c *Controller) Start(startViewNumber uint64, startProposalSequence uint64, c.syncChan = make(chan struct{}, 1) c.stopChan = make(chan struct{}) c.leaderToken = make(chan struct{}, 1) - c.decisionChan = make(chan decision) + c.decisionChan = make(chan decision, 1) c.deliverChan = make(chan struct{}) c.viewChange = make(chan viewInfo, 1) c.abortViewChan = make(chan uint64, 1) @@ -886,6 +895,7 @@ func (c *Controller) Decide(proposal types.Proposal, signatures []types.Signatur select { case <-c.deliverChan: // wait for the delivery of the decision to the application case <-c.stopChan: // If we stopped the controller, abort delivery + case <-c.currentViewAbortChan(): // If we stopped the view, abort delivery } } diff --git a/internal/bft/view.go b/internal/bft/view.go index 4586e6b4..16878efe 100644 --- a/internal/bft/view.go +++ b/internal/bft/view.go @@ -1015,6 +1015,10 @@ func (v *View) Stopped() bool { } } +func (v *View) AbortChan() <-chan struct{} { + return v.abortChan +} + func (v *View) GetLeaderID() uint64 { return v.LeaderID } diff --git a/test/basic_test.go b/test/basic_test.go index 2f6ce93b..a22ea0aa 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -10,6 +10,7 @@ import ( "encoding/hex" "fmt" "os" + "runtime" "strconv" "strings" "sync" @@ -3306,6 +3307,106 @@ ExternalLoop: } } +// Decide and abort view at the same time +func TestDecideAndAbortViewAtSameTime(t *testing.T) { + t.Parallel() + + done := make(chan struct{}) + defer close(done) + + network := NewNetwork() + defer network.Shutdown() + + testDir, err := os.MkdirTemp("", t.Name()) + assert.NoErrorf(t, err, "generate temporary test dir") + defer os.RemoveAll(testDir) + + numberOfNodes := 4 + nodes := make([]*App, 0) + for i := 1; i <= numberOfNodes; i++ { + n := newNode(uint64(i), network, t.Name(), testDir, false, 0) + n.Consensus.Config.LeaderHeartbeatTimeout = 5 * time.Second + n.Consensus.Config.LeaderHeartbeatCount = 2 + n.Consensus.Config.ViewChangeTimeout = 20 * time.Second + nodes = append(nodes, n) + } + + var ( + once sync.Once + once1 sync.Once + ) + leaderComplaneCh := make(chan struct{}) + abortingCh := make(chan struct{}, 1) + nextSeqCh := make(chan struct{}, 1) + nextStep := make(chan struct{}, 1) + + baseLogger := nodes[3].logger.Desugar() + nodes[3].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error { + if strings.Contains(entry.Message, "Heartbeat timeout expired") { + once.Do(func() { + leaderComplaneCh <- struct{}{} + }) + } + if strings.Contains(entry.Message, "Aborting current view with number") { + once1.Do(func() { + abortingCh <- struct{}{} + <-nextStep + }) + } + if strings.Contains(entry.Message, "Sequence: 2-->3") { + nextSeqCh <- struct{}{} + <-nextStep + } + return nil + })).Sugar() + + nodes[3].Setup() + + startNodes(nodes, network) + + var counter uint64 + accelerateTime(nodes, done, true, true, &counter) + + nodes[0].Submit(Request{ID: "1", ClientID: "alice"}) // submit to leader + data := make([]*AppRecord, 0) + for i := 0; i < numberOfNodes; i++ { + d := <-nodes[i].Delivered + data = append(data, d) + } + for i := 0; i < numberOfNodes-1; i++ { + assert.Equal(t, data[i], data[i+1]) + } + + nodes[0].Submit(Request{ID: "2", ClientID: "alice"}) + <-nextSeqCh + + // Emulation of node removal from the consensus in the absence of heartbeat + nodes[3].Disconnect() + <-leaderComplaneCh + nodes[3].Connect() + + <-abortingCh + + close(nextStep) + data = make([]*AppRecord, 0, 4) + fail := time.After(15 * time.Second) + for i := 0; i < numberOfNodes; i++ { + select { + case d := <-nodes[i].Delivered: + data = append(data, d) + case <-fail: + buf := make([]byte, 1<<16) + runtime.Stack(buf, true) + fmt.Printf("%s", buf) + t.Fatal("Didn't deliver two msg") + } + } + + for i := 0; i < numberOfNodes-1; i++ { + assert.Equal(t, data[i], data[i+1]) + } +} + func doInBackground(f func(), stop <-chan struct{}) { for { select {