Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit fe01e6e

Browse files
committed
[FABG-698] Event client reconn bug fix and enhancements
Fixed a bug where the threshold subtracted from the max height resulted in a very high uint64 number (due to overflow). Check the height of the current peer from discovery as well as the number of blocks received. This prevents the client from disconnecting prematurely. Change-Id: I79c9168bc3447fc6e84b90879d451bf6af65b3bd Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent fa73e44 commit fe01e6e

File tree

3 files changed

+111
-52
lines changed

3 files changed

+111
-52
lines changed

pkg/fab/events/client/client_test.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,9 +1396,9 @@ func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expecte
13961396
}
13971397

13981398
func TestDisconnectIfBlockHeightLags(t *testing.T) {
1399-
p1 := clientmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051", 10)
1400-
p2 := clientmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051", 8)
1401-
p3 := clientmocks.NewMockPeer("peer3", "grpcs://peer3.example.com:7051", 8)
1399+
p1 := clientmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051", 4)
1400+
p2 := clientmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051", 1)
1401+
p3 := clientmocks.NewMockPeer("peer3", "grpcs://peer3.example.com:7051", 1)
14021402

14031403
connectch := make(chan *dispatcher.ConnectionEvent)
14041404

@@ -1422,8 +1422,8 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
14221422
WithTimeBetweenConnectAttempts(time.Millisecond),
14231423
WithConnectionEvent(connectch),
14241424
WithResponseTimeout(2 * time.Second),
1425-
dispatcher.WithBlockHeightLagThreshold(5),
1426-
dispatcher.WithReconnectBlockHeightThreshold(10),
1425+
dispatcher.WithBlockHeightLagThreshold(2),
1426+
dispatcher.WithReconnectBlockHeightThreshold(3),
14271427
dispatcher.WithBlockHeightMonitorPeriod(250 * time.Millisecond),
14281428
},
14291429
)
@@ -1439,28 +1439,21 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
14391439
go listenConnection(connectch, outcomech)
14401440

14411441
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx1", pb.TxValidationCode_VALID))
1442+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx2", pb.TxValidationCode_VALID))
1443+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx3", pb.TxValidationCode_VALID))
1444+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx4", pb.TxValidationCode_VALID))
1445+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx5", pb.TxValidationCode_VALID))
14421446

14431447
time.Sleep(time.Second)
14441448

14451449
// Set the block height of another peer to be greater than the disconnect threshold
14461450
// so that the event client can reconnect to another peer
1447-
p2.SetBlockHeight(20)
1448-
time.Sleep(time.Second)
1451+
p2.SetBlockHeight(9)
14491452

14501453
select {
14511454
case outcome := <-outcomech:
14521455
assert.Equal(t, mockconn.ReconnectedOutcome, outcome)
1453-
case <-time.After(5 * time.Second):
1454-
t.Fatal("Timed out waiting for reconnect")
1455-
}
1456-
1457-
p3.SetBlockHeight(30)
1458-
time.Sleep(time.Second)
1459-
1460-
select {
1461-
case outcome := <-outcomech:
1462-
assert.Equal(t, mockconn.ReconnectedOutcome, outcome)
1463-
case <-time.After(5 * time.Second):
1456+
case <-time.After(3 * time.Second):
14641457
t.Fatal("Timed out waiting for reconnect")
14651458
}
14661459
}

pkg/fab/events/client/dispatcher/dispatcher.go

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ package dispatcher
88

99
import (
1010
"fmt"
11-
"math"
11+
"sync"
1212
"time"
1313

1414
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
@@ -37,6 +37,8 @@ type Dispatcher struct {
3737
connectionProvider api.ConnectionProvider
3838
discoveryService fab.DiscoveryService
3939
ticker *time.Ticker
40+
peer fab.Peer
41+
lock sync.RWMutex
4042
}
4143

4244
// New creates a new dispatcher
@@ -61,10 +63,6 @@ func (ed *Dispatcher) Start() error {
6163
if err := ed.Dispatcher.Start(); err != nil {
6264
return errors.WithMessage(err, "error starting client event dispatcher")
6365
}
64-
if ed.reconnectBlockHeightLagThreshold > 0 {
65-
ed.ticker = time.NewTicker(ed.blockHeightMonitorPeriod)
66-
go ed.monitorBlockHeight()
67-
}
6866
return nil
6967
}
7068

@@ -132,6 +130,7 @@ func (ed *Dispatcher) HandleConnectEvent(e esdispatcher.Event) {
132130
}
133131

134132
ed.connection = conn
133+
ed.setConnectedPeer(peer)
135134

136135
go ed.connection.Receive(eventch)
137136

@@ -151,6 +150,7 @@ func (ed *Dispatcher) HandleDisconnectEvent(e esdispatcher.Event) {
151150

152151
ed.connection.Close()
153152
ed.connection = nil
153+
ed.setConnectedPeer(nil)
154154

155155
evt.Errch <- nil
156156
}
@@ -181,6 +181,11 @@ func (ed *Dispatcher) HandleConnectedEvent(e esdispatcher.Event) {
181181
logger.Warn("Unable to send to connection event channel.")
182182
}
183183
}
184+
185+
if ed.reconnectBlockHeightLagThreshold > 0 {
186+
ed.ticker = time.NewTicker(ed.blockHeightMonitorPeriod)
187+
go ed.monitorBlockHeight()
188+
}
184189
}
185190

186191
// HandleDisconnectedEvent sends a 'disconnected' event to any registered listener
@@ -204,6 +209,10 @@ func (ed *Dispatcher) HandleDisconnectedEvent(e esdispatcher.Event) {
204209
} else {
205210
logger.Warnf("Disconnected from event server: %s", evt.Err)
206211
}
212+
213+
if ed.ticker != nil {
214+
ed.ticker.Stop()
215+
}
207216
}
208217

209218
func (ed *Dispatcher) registerHandlers() {
@@ -275,48 +284,80 @@ func getMaxBlockHeight(peers []fab.Peer) uint64 {
275284
}
276285

277286
func (ed *Dispatcher) monitorBlockHeight() {
278-
logger.Debugf("Starting block height monitor. Lag threshold: %d", ed.reconnectBlockHeightLagThreshold)
287+
logger.Debugf("Starting block height monitor on channel [%s]. Lag threshold: %d", ed.chConfig.ID(), ed.reconnectBlockHeightLagThreshold)
288+
279289
for {
280290
if _, ok := <-ed.ticker.C; !ok {
281-
logger.Debugf("Stopping block height monitor")
291+
logger.Debugf("Stopping block height monitor on channel [%s]", ed.chConfig.ID())
292+
return
293+
}
294+
if !ed.checkBlockHeight() {
295+
// Disconnected
296+
logger.Debugf("Client on channel [%s] has disconnected - stopping block height monitor", ed.chConfig.ID())
282297
return
283298
}
284-
ed.checkBlockHeight()
285299
}
286300
}
287301

288-
func (ed *Dispatcher) checkBlockHeight() {
289-
logger.Debugf("Checking block heights...")
302+
// checkBlockHeight checks the current peer's block height relative to the block heights of the
303+
// other peers in the channel and disconnects the peer if the configured threshold is reached.
304+
// Returns true if the block height is acceptable; false if the client has been disconnected from the peer
305+
func (ed *Dispatcher) checkBlockHeight() bool {
306+
logger.Debugf("Checking block heights on channel [%s]...", ed.chConfig.ID())
290307

291-
lastBlockReceived := ed.LastBlockNum()
292-
if lastBlockReceived == math.MaxUint64 {
293-
logger.Debugf("No blocks have been received yet")
294-
return
308+
connectedPeer := ed.connectedPeer()
309+
if connectedPeer == nil {
310+
logger.Debugf("Not connected yet")
311+
return true
295312
}
296313

314+
peerState, ok := connectedPeer.(fab.PeerState)
315+
if !ok {
316+
logger.Debugf("Peer does not contain state")
317+
return true
318+
}
319+
320+
lastBlockReceived := ed.LastBlockNum()
321+
connectedPeerBlockHeight := peerState.BlockHeight()
322+
297323
peers, err := ed.discoveryService.GetPeers()
298324
if err != nil {
299325
logger.Warnf("Error checking block height on peers: %s", err)
300-
return
326+
return true
301327
}
302328

303-
connectedPeerBlockHeight := lastBlockReceived + 1
304329
maxHeight := getMaxBlockHeight(peers)
305330

306-
logger.Debugf("Block height from blocks received: %d, Max block height from Discovery: %d", connectedPeerBlockHeight, maxHeight)
331+
logger.Debugf("Block height on channel [%s] of connected peer [%s] from Discovery: %d, Last block received: %d, Max block height from Discovery: %d", ed.chConfig.ID(), connectedPeer.URL(), connectedPeerBlockHeight, lastBlockReceived, maxHeight)
332+
333+
if maxHeight <= uint64(ed.reconnectBlockHeightLagThreshold) {
334+
logger.Debugf("Max block height on channel [%s] of peers is %d and reconnect lag threshold is %d so event client will not be disconnected from peer", ed.chConfig.ID(), maxHeight, ed.reconnectBlockHeightLagThreshold)
335+
return true
336+
}
337+
338+
// The last block received may be lagging the actual block height of the peer
339+
if lastBlockReceived+1 < connectedPeerBlockHeight {
340+
// We can still get more blocks from the connected peer. Don't disconnect
341+
logger.Debugf("Block height on channel [%s] of connected peer [%s] from Discovery is %d which is greater than last block received+1: %d. Won't disconnect from this peer since more blocks can still be retrieved from the peer", ed.chConfig.ID(), connectedPeer.URL(), connectedPeerBlockHeight, lastBlockReceived+1)
342+
return true
343+
}
307344

308345
cutoffHeight := maxHeight - uint64(ed.reconnectBlockHeightLagThreshold)
346+
peerBlockHeight := lastBlockReceived + 1
309347

310-
if connectedPeerBlockHeight >= cutoffHeight {
311-
logger.Debugf("Block height from blocks received is %d which is greater than or equal to the cutoff %d", connectedPeerBlockHeight, cutoffHeight)
312-
} else {
313-
logger.Infof("Block height from blocks received is %d which is less than the cutoff %d. Disconnecting from the peer...", connectedPeerBlockHeight, cutoffHeight)
314-
if err := ed.disconnect(); err != nil {
315-
logger.Warnf("Error disconnecting event client: %s", err)
316-
} else {
317-
logger.Info("Successfully disconnected event client")
318-
}
348+
if peerBlockHeight >= cutoffHeight {
349+
logger.Debugf("Block height on channel [%s] from connected peer [%s] is %d which is greater than or equal to the cutoff %d so event client will not be disconnected from peer", ed.chConfig.ID(), connectedPeer.URL(), peerBlockHeight, cutoffHeight)
350+
return true
319351
}
352+
353+
logger.Infof("Block height on channel [%s] from connected peer is %d which is less than the cutoff %d. Disconnecting from the peer...", ed.chConfig.ID(), peerBlockHeight, cutoffHeight)
354+
if err := ed.disconnect(); err != nil {
355+
logger.Warnf("Error disconnecting event client from channel [%s]: %s", ed.chConfig.ID(), err)
356+
return true
357+
}
358+
359+
logger.Info("Successfully disconnected event client from channel [%s]", ed.chConfig.ID())
360+
return false
320361
}
321362

322363
func (ed *Dispatcher) disconnect() error {
@@ -336,3 +377,15 @@ func (ed *Dispatcher) disconnect() error {
336377
eventch <- NewDisconnectedEvent(nil)
337378
return nil
338379
}
380+
381+
func (ed *Dispatcher) setConnectedPeer(peer fab.Peer) {
382+
ed.lock.Lock()
383+
defer ed.lock.Unlock()
384+
ed.peer = peer
385+
}
386+
387+
func (ed *Dispatcher) connectedPeer() fab.Peer {
388+
ed.lock.RLock()
389+
defer ed.lock.RUnlock()
390+
return ed.peer
391+
}

pkg/fab/events/client/dispatcher/dispatcher_test.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,9 @@ func TestFilterByBlockHeight(t *testing.T) {
240240
}
241241

242242
func TestDisconnectIfBlockHeightLags(t *testing.T) {
243-
p1 := clientmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051", 10)
244-
p2 := clientmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051", 8)
245-
p3 := clientmocks.NewMockPeer("peer3", "grpcs://peer3.example.com:7051", 8)
243+
p1 := clientmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051", 4)
244+
p2 := clientmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051", 1)
245+
p3 := clientmocks.NewMockPeer("peer3", "grpcs://peer3.example.com:7051", 1)
246246

247247
channelID := "testchannel"
248248

@@ -259,8 +259,8 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
259259
),
260260
),
261261
),
262-
WithBlockHeightLagThreshold(5),
263-
WithReconnectBlockHeightThreshold(10),
262+
WithBlockHeightLagThreshold(2),
263+
WithReconnectBlockHeightThreshold(3),
264264
WithBlockHeightMonitorPeriod(250*time.Millisecond),
265265
)
266266

@@ -295,16 +295,29 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
295295
t.Fatalf("Error connecting: %s", err)
296296
}
297297

298-
dispatcherEventch <- esdispatcher.NewBlockEvent(servicemocks.NewBlockProducer().NewBlock(channelID), sourceURL)
298+
dispatcherEventch <- NewConnectedEvent()
299+
300+
select {
301+
case e := <-connch:
302+
assert.Truef(t, e.Connected, "expecting connected event")
303+
case <-time.After(time.Second):
304+
t.Fatal("Expecting connected event but got none")
305+
}
306+
307+
blockProducer := servicemocks.NewBlockProducer()
308+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
309+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
310+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
311+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
312+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
299313

300314
time.Sleep(time.Second)
301315
p2.SetBlockHeight(15)
302-
time.Sleep(time.Second)
303316

304317
select {
305318
case e := <-connch:
306319
assert.Falsef(t, e.Connected, "expecting disconnected event")
307-
default:
320+
case <-time.After(time.Second):
308321
t.Fatal("Expecting disconnected event but got none")
309322
}
310323
}

0 commit comments

Comments
 (0)