Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/bft/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ func (c *Controller) sync() (viewNum uint64, seq uint64, decisions uint64) {
controllerViewNum := c.currViewNumber
newViewNum = controllerViewNum

newDecisionsInView = c.getCurrentDecisionsInView()

if latestDecisionSeq > controllerSequence {
c.Logger.Infof("Synchronizer returned with sequence %d while the controller is at sequence %d", latestDecisionSeq, controllerSequence)
c.Logger.Debugf("Node %d is setting the checkpoint after sync returned with view %d and seq %d", c.ID, latestDecisionViewNum, latestDecisionSeq)
Expand Down
150 changes: 150 additions & 0 deletions internal/bft/controller_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright IBM Corp. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

package bft

import (
"testing"
"time"

"github.com/hyperledger-labs/SmartBFT/pkg/types"
protos "github.com/hyperledger-labs/SmartBFT/smartbftprotos"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

// stubSynchronizer returns a fixed SyncResponse.
type stubSynchronizer struct {
response types.SyncResponse
}

func (s *stubSynchronizer) Sync() types.SyncResponse { return s.response }

// stubComm is a no-op implementation of api.Comm.
type stubComm struct{}

func (s *stubComm) SendConsensus(targetID uint64, m *protos.Message) {}
func (s *stubComm) SendTransaction(targetID uint64, request []byte) {}
func (s *stubComm) Nodes() []uint64 { return []uint64{1, 2, 3, 4} }
func (s *stubComm) BroadcastConsensus(m *protos.Message) {}

// TestSyncDecisionsInView tests that the sync() method returns the correct
// DecisionsInView value in two scenarios:
// - same height: sync returns the same sequence as the controller, so
// newDecisionsInView must come from the controller's in-memory state
// (getCurrentDecisionsInView), not remain zero-initialized.
// - higher height: sync returns a higher sequence, so newDecisionsInView
// is overridden from the sync response metadata (+1).
func TestSyncDecisionsInView(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add a test in test/basic_test.go

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test should fail without the fix you are offering (and should pass with it)

const (
controllerSeq = uint64(2365908)
controllerView = uint64(782)
controllerDecisions = uint64(9578)
)

// newController creates a minimal Controller with only the fields
// needed by sync(). The checkpoint metadata sets the controller's
// current sequence, and the synchronizer controls what sync returns.
newController := func(t *testing.T, syncResponse types.SyncResponse) *Controller {
t.Helper()

basicLog, err := zap.NewDevelopment()
assert.NoError(t, err)
log := basicLog.Sugar()

// Checkpoint metadata determines latestSeq() return value.
checkpoint := &types.Checkpoint{}
checkpoint.Set(types.Proposal{
Metadata: MarshalOrPanic(&protos.ViewMetadata{
LatestSequence: controllerSeq,
ViewId: controllerView,
DecisionsInView: controllerDecisions,
}),
}, nil)

// StateCollector with very short timeout so fetchState() returns nil
// without needing real consensus broadcast responses.
collector := &StateCollector{
SelfID: 1,
N: 4,
Logger: log,
CollectTimeout: time.Millisecond,
}
collector.Start()
t.Cleanup(collector.Stop)

c := &Controller{
ID: 1,
N: 4,
Logger: log,
Comm: &stubComm{},
Synchronizer: &stubSynchronizer{response: syncResponse},
Checkpoint: checkpoint,
Collector: collector,
InFlight: &InFlightData{},
ViewChanger: &ViewChanger{},
currViewNumber: controllerView,
currDecisionsInView: controllerDecisions + 1, // checkpoint stores N, decide() increments to N+1
}
// sync() uses grabSyncToken/relinquishSyncToken which need syncChan.
c.syncChan = make(chan struct{}, 1)

return c
}

t.Run("same_height_must_preserve_decisions", func(t *testing.T) {
// Synchronizer returns the same sequence as the controller.
// This simulates the "already at target height" scenario where
// the orderer is not behind but sync was triggered anyway.
c := newController(t, types.SyncResponse{
Latest: types.Decision{
Proposal: types.Proposal{
Metadata: MarshalOrPanic(&protos.ViewMetadata{
LatestSequence: controllerSeq,
ViewId: controllerView,
DecisionsInView: controllerDecisions,
}),
VerificationSequence: 0,
},
},
Reconfig: types.ReconfigSync{InReplicatedDecisions: false},
})

viewNum, seq, decisions := c.sync()

assert.Equal(t, controllerView, viewNum, "view number should be preserved")
assert.Equal(t, controllerSeq+1, seq, "proposal sequence should be controllerSeq+1")
// newDecisionsInView should be initialized from the controller's in-memory
// state (getCurrentDecisionsInView), which is controllerDecisions+1.
assert.Equal(t, controllerDecisions+1, decisions,
"DecisionsInView must be preserved when sync returns same height")
})

t.Run("higher_height_overrides_decisions", func(t *testing.T) {
// Synchronizer returns a higher sequence than the controller.
// In this case newDecisionsInView is overridden from the sync response (+1).
syncDecisions := controllerDecisions + 1
c := newController(t, types.SyncResponse{
Latest: types.Decision{
Proposal: types.Proposal{
Metadata: MarshalOrPanic(&protos.ViewMetadata{
LatestSequence: controllerSeq + 1,
ViewId: controllerView,
DecisionsInView: syncDecisions,
}),
VerificationSequence: 0,
},
},
Reconfig: types.ReconfigSync{InReplicatedDecisions: false},
})

viewNum, seq, decisions := c.sync()

assert.Equal(t, controllerView, viewNum, "view number should be preserved")
assert.Equal(t, controllerSeq+2, seq, "proposal sequence should be syncSeq+1")
assert.Equal(t, syncDecisions+1, decisions,
"DecisionsInView should be latestDecisionDecisions+1 when sync returns higher height")
})
}
99 changes: 99 additions & 0 deletions test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,105 @@ func TestCatchingUpWithSyncAutonomous(t *testing.T) {
assert.Equal(t, uint32(0), atomic.LoadUint32(&detectedSequenceGap))
}

func TestSyncSameHeightPreservesDecisionsInView(t *testing.T) {
// Scenario:
// we inject a fake heartbeat with a higher view number to node 4 (index 3).
// The HeartbeatMonitor sees hb.View > hm.view and calls handler.Sync()
// directly — no view change involved. Since node 4 already has all blocks,
// sync() returns the same height.
t.Parallel()
network := NewNetwork()
defer network.Shutdown()

testDir, err := os.MkdirTemp("", t.Name())
assert.NoErrorf(t, err, "generate temporary test dir")
defer os.RemoveAll(testDir)

numberOfNodes := 4
nodes := make([]*App, 0)
for i := 1; i <= numberOfNodes; i++ {
n := newNode(uint64(i), network, t.Name(), testDir, false, 0)
nodes = append(nodes, n)
}

// Hook node 4's logger to detect:
// 1. When sync is processed (so we know it's safe to submit more proposals)
// 2. The bug symptom: "Expected decisions in view" validation failure
var syncProcessed uint32
bugDetected := make(chan struct{}, 1)
baseLogger := nodes[3].Consensus.Logger.(*zap.SugaredLogger).Desugar()
nodes[3].Consensus.Logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "get msg from syncChan") {
atomic.StoreUint32(&syncProcessed, 1)
}
if strings.Contains(entry.Message, "Expected decisions in view") {
select {
case bugDetected <- struct{}{}:
default:
}
}
return nil
})).Sugar()

startNodes(nodes, network)

// Phase 1: Submit 5 proposals to build up DecisionsInView on all nodes.
for i := 1; i <= 5; i++ {
nodes[0].Submit(Request{ID: fmt.Sprintf("%d", i), ClientID: "alice"})
for j := 0; j < numberOfNodes; j++ {
<-nodes[j].Delivered
}
}

// Phase 2:
// the bug this test validate depended on non-deterministic Go select ordering
// in the [internal.bft.Controller] run() function:
// select {
// case newView := <-c.viewChange:
// case <-c.syncChan:
// }
// That's why it can't be reliably reproduced through the natural path
// in tests — hence the fake heartbeat that triggers only sync, guaranteeing
// the bug path.
//
// The test injects a fake heartbeat with View=1 (higher than current view 0)
// to node 4. The HeartbeatMonitor sees hb.View > hm.view and calls
// handler.Sync() directly, bypassing any view change.
// The sender must be the leader (ID 1) to pass the leaderID check.
fakeHeartbeat := &smartbftprotos.Message{
Content: &smartbftprotos.Message_HeartBeat{
HeartBeat: &smartbftprotos.HeartBeat{
View: 1, // higher than node 4's current view (0)
Seq: 6,
},
},
}
nodes[3].Consensus.HandleMessage(1, fakeHeartbeat)

// Wait for the sync to be processed by the controller's run loop.
assert.Eventually(t, func() bool {
return atomic.LoadUint32(&syncProcessed) == 1
}, 30*time.Second, 50*time.Millisecond,
"Node 4 should process sync triggered by fake heartbeat")

// Phase 3: Submit more proposals. Without the bug fix, node 4's view was
// restarted with DecisionsInView=0, so it rejects proposals from the
// leader (which has DecisionsInView=5).
for i := 6; i <= 10; i++ {
nodes[0].Submit(Request{ID: fmt.Sprintf("%d", i), ClientID: "alice"})
for j := 0; j < numberOfNodes; j++ {
select {
case <-nodes[j].Delivered:
case <-bugDetected:
t.Fatalf("DecisionsInView validation failed on node: sync at same height reset DecisionsInView to 0")
case <-time.After(30 * time.Second):
t.Fatalf("Node %d did not deliver proposal %d within timeout", j+1, i)
}
}
}

}

func TestFollowerStateTransfer(t *testing.T) {
// Scenario: the leader (n0) is disconnected and so there is a view change
// a follower (n6) is also disconnected and misses the view change
Expand Down