Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit b9db2f4

Browse files
committed
[FABG-761] Added prefer-org and prefer-peer resolvers
Implemented two new peer resolvers: Prefer-Org: - Determines which peers are suitable based on block height, although will prefer the peers in the current org (as long as their block height is above a configured threshold). If none of the peers from the current org are suitable then a peer from another org is chosen. Prefer-Peer: - Determines which peers are suitable based on block height, although will prefer the peers in the provided list (as long as their block height is above a configured threshold). If none of the peers in the provided list are suitable then an attempt is made to select a peer from the current org will be selected. If none of the peers from the current org are suitable then a peer from another org is chosen. Change-Id: Ieffee027ba03550cf83399742a7ac9f44e38c96f Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent c3753ac commit b9db2f4

File tree

10 files changed

+677
-8
lines changed

10 files changed

+677
-8
lines changed

pkg/fab/events/client/client_test.go

Lines changed: 149 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,18 @@ import (
1515
"testing"
1616
"time"
1717

18-
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"
18+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/preferorg"
1919

2020
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
2121
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
2222
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
2323
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api"
2424
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher"
25+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"
2526
clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks"
2627
mockconn "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks"
2728
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/minblockheight"
29+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/preferpeer"
2830
esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
2931
servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
3032
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
@@ -1449,8 +1451,6 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
14491451
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx4", pb.TxValidationCode_VALID))
14501452
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx5", pb.TxValidationCode_VALID))
14511453

1452-
time.Sleep(time.Second)
1453-
14541454
// Set the block height of another peer to be greater than the disconnect threshold
14551455
// so that the event client can reconnect to another peer
14561456
p2.SetBlockHeight(9)
@@ -1463,6 +1463,152 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
14631463
}
14641464
}
14651465

1466+
// TestPreferLocalOrgConnection tests the scenario where an org wishes to connect to it's own peers
1467+
// if they are above the block height lag threshold but, if they fall below the threshold, the
1468+
// connection should be made to another org's peer. Once the local org's peers have caught up in
1469+
// block height, the connection to the local peer should be re-established.
1470+
func TestPreferLocalOrgConnection(t *testing.T) {
1471+
channelID := "testchannel"
1472+
org1MSP := "Org1MSP"
1473+
org2MSP := "Org2MSP"
1474+
blockHeightLagThreshold := 2
1475+
1476+
p1O1 := clientmocks.NewMockStatefulPeer("p1_o1", "peer1.org1.com:7051", clientmocks.WithMSP(org1MSP), clientmocks.WithBlockHeight(4))
1477+
p2O1 := clientmocks.NewMockStatefulPeer("p2_o1", "peer2.org1.com:7051", clientmocks.WithMSP(org1MSP), clientmocks.WithBlockHeight(3))
1478+
p1O2 := clientmocks.NewMockStatefulPeer("p1_o2", "peer1.org2.com:7051", clientmocks.WithMSP(org2MSP), clientmocks.WithBlockHeight(10))
1479+
p2O2 := clientmocks.NewMockStatefulPeer("p2_o2", "peer1.org2.com:7051", clientmocks.WithMSP(org2MSP), clientmocks.WithBlockHeight(11))
1480+
1481+
connectch := make(chan *dispatcher.ConnectionEvent)
1482+
1483+
conn := clientmocks.NewMockConnection(
1484+
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
1485+
)
1486+
connectionProvider := clientmocks.NewProviderFactory().Provider(conn)
1487+
1488+
eventClient, _, err := newClientWithMockConnAndOpts(
1489+
fabmocks.NewMockContext(
1490+
mspmocks.NewMockSigningIdentity("user1", "Org1MSP"),
1491+
),
1492+
fabmocks.NewMockChannelCfg(channelID),
1493+
clientmocks.NewDiscoveryService(p1O1, p2O1, p1O2, p2O2),
1494+
connectionProvider, filteredClientProvider,
1495+
[]options.Opt{
1496+
esdispatcher.WithEventConsumerTimeout(3 * time.Second),
1497+
WithMaxConnectAttempts(1),
1498+
WithTimeBetweenConnectAttempts(time.Millisecond),
1499+
WithConnectionEvent(connectch),
1500+
WithResponseTimeout(2 * time.Second),
1501+
dispatcher.WithPeerResolver(preferorg.NewResolver()),
1502+
dispatcher.WithLoadBalancePolicy(lbp.NewRoundRobin()),
1503+
dispatcher.WithPeerMonitorPeriod(250 * time.Millisecond),
1504+
minblockheight.WithBlockHeightLagThreshold(blockHeightLagThreshold),
1505+
minblockheight.WithReconnectBlockHeightThreshold(3),
1506+
},
1507+
)
1508+
require.NoErrorf(t, err, "error creating channel event client")
1509+
err = eventClient.Connect()
1510+
require.NoErrorf(t, err, "errorconnecting channel event client")
1511+
defer eventClient.Close()
1512+
1513+
connectedPeer := eventClient.Dispatcher().(*dispatcher.Dispatcher).ConnectedPeer()
1514+
assert.Equal(t, org2MSP, connectedPeer.MSPID())
1515+
1516+
outcomech := make(chan mockconn.Outcome)
1517+
go listenConnection(connectch, outcomech)
1518+
1519+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx1", pb.TxValidationCode_VALID))
1520+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx2", pb.TxValidationCode_VALID))
1521+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx3", pb.TxValidationCode_VALID))
1522+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx4", pb.TxValidationCode_VALID))
1523+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx5", pb.TxValidationCode_VALID))
1524+
1525+
// Set the block height of the local peer to be greater than the disconnect threshold
1526+
// so that the event client can reconnect to the local peer
1527+
p2O1.SetBlockHeight(9)
1528+
1529+
select {
1530+
case outcome := <-outcomech:
1531+
assert.Equal(t, mockconn.ReconnectedOutcome, outcome)
1532+
connectedPeer := eventClient.Dispatcher().(*dispatcher.Dispatcher).ConnectedPeer()
1533+
assert.Equal(t, org1MSP, connectedPeer.MSPID())
1534+
case <-time.After(3 * time.Second):
1535+
t.Fatal("Timed out waiting for reconnect")
1536+
}
1537+
}
1538+
1539+
// TestPreferLocalPeersConnection tests the scenario where an org wishes to connect to one of a list of preferred peers
1540+
// if they are above the block height lag threshold but, if they fall below the threshold, the
1541+
// connection should be made to another peer. Once the preferred peers have caught up in
1542+
// block height, the connection to one of the preferred peers should be re-established.
1543+
func TestPreferLocalPeersConnection(t *testing.T) {
1544+
channelID := "testchannel"
1545+
org1MSP := "Org1MSP"
1546+
org2MSP := "Org2MSP"
1547+
blockHeightLagThreshold := 2
1548+
1549+
p1O1 := clientmocks.NewMockStatefulPeer("p1_o1", "peer1.org1.com:7051", clientmocks.WithMSP(org1MSP), clientmocks.WithBlockHeight(4))
1550+
p2O1 := clientmocks.NewMockStatefulPeer("p2_o1", "peer2.org1.com:7051", clientmocks.WithMSP(org1MSP), clientmocks.WithBlockHeight(3))
1551+
p1O2 := clientmocks.NewMockStatefulPeer("p1_o2", "peer1.org2.com:7051", clientmocks.WithMSP(org2MSP), clientmocks.WithBlockHeight(10))
1552+
p2O2 := clientmocks.NewMockStatefulPeer("p2_o2", "peer1.org2.com:7051", clientmocks.WithMSP(org2MSP), clientmocks.WithBlockHeight(11))
1553+
1554+
connectch := make(chan *dispatcher.ConnectionEvent)
1555+
1556+
conn := clientmocks.NewMockConnection(
1557+
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
1558+
)
1559+
connectionProvider := clientmocks.NewProviderFactory().Provider(conn)
1560+
1561+
eventClient, _, err := newClientWithMockConnAndOpts(
1562+
fabmocks.NewMockContext(
1563+
mspmocks.NewMockSigningIdentity("user1", "Org1MSP"),
1564+
),
1565+
fabmocks.NewMockChannelCfg(channelID),
1566+
clientmocks.NewDiscoveryService(p1O1, p2O1, p1O2, p2O2),
1567+
connectionProvider, filteredClientProvider,
1568+
[]options.Opt{
1569+
esdispatcher.WithEventConsumerTimeout(3 * time.Second),
1570+
WithMaxConnectAttempts(1),
1571+
WithTimeBetweenConnectAttempts(time.Millisecond),
1572+
WithConnectionEvent(connectch),
1573+
WithResponseTimeout(2 * time.Second),
1574+
dispatcher.WithPeerResolver(preferpeer.NewResolver(p1O1.URL(), p2O1.URL())),
1575+
dispatcher.WithLoadBalancePolicy(lbp.NewRoundRobin()),
1576+
dispatcher.WithPeerMonitorPeriod(250 * time.Millisecond),
1577+
minblockheight.WithBlockHeightLagThreshold(blockHeightLagThreshold),
1578+
minblockheight.WithReconnectBlockHeightThreshold(3),
1579+
},
1580+
)
1581+
require.NoErrorf(t, err, "error creating channel event client")
1582+
err = eventClient.Connect()
1583+
require.NoErrorf(t, err, "errorconnecting channel event client")
1584+
defer eventClient.Close()
1585+
1586+
connectedPeer := eventClient.Dispatcher().(*dispatcher.Dispatcher).ConnectedPeer()
1587+
assert.Equal(t, org2MSP, connectedPeer.MSPID())
1588+
1589+
outcomech := make(chan mockconn.Outcome)
1590+
go listenConnection(connectch, outcomech)
1591+
1592+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx1", pb.TxValidationCode_VALID))
1593+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx2", pb.TxValidationCode_VALID))
1594+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx3", pb.TxValidationCode_VALID))
1595+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx4", pb.TxValidationCode_VALID))
1596+
conn.Ledger().NewFilteredBlock(channelID, servicemocks.NewFilteredTx("tx5", pb.TxValidationCode_VALID))
1597+
1598+
// Set the block height of the local peer to be greater than the disconnect threshold
1599+
// so that the event client can reconnect to the local peer
1600+
p2O1.SetBlockHeight(9)
1601+
1602+
select {
1603+
case outcome := <-outcomech:
1604+
assert.Equal(t, mockconn.ReconnectedOutcome, outcome)
1605+
connectedPeer := eventClient.Dispatcher().(*dispatcher.Dispatcher).ConnectedPeer()
1606+
assert.Equal(t, org1MSP, connectedPeer.MSPID())
1607+
case <-time.After(3 * time.Second):
1608+
t.Fatal("Timed out waiting for reconnect")
1609+
}
1610+
}
1611+
14661612
func TestTransferRegistrations(t *testing.T) {
14671613
// Tests the scenario where all event registrations are transferred to another event client.
14681614
t.Run("Transfer", func(t *testing.T) {

pkg/fab/events/client/dispatcher/dispatcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (ed *Dispatcher) clearConnectionRegistration() {
242242
}
243243

244244
func (ed *Dispatcher) monitorPeer(done chan struct{}) {
245-
logger.Infof("Starting peer monitor on channel [%s]", ed.chConfig.ID())
245+
logger.Debugf("Starting peer monitor on channel [%s]", ed.chConfig.ID())
246246

247247
ticker := time.NewTicker(ed.peerMonitorPeriod)
248248
defer ticker.Stop()

pkg/fab/events/client/dispatcher/dispatcher_test.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"
1515
clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks"
1616
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/minblockheight"
17+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/preferorg"
1718
esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
1819
servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
1920
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
@@ -293,7 +294,6 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
293294
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
294295
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
295296

296-
time.Sleep(time.Second)
297297
p2.SetBlockHeight(15)
298298

299299
select {
@@ -304,6 +304,98 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
304304
}
305305
}
306306

307+
// TestPreferLocalOrgConnection tests the scenario where an org wishes to connect to it's own peers
308+
// if they are above the block height lag threshold but, if they fall below the threshold, the
309+
// connection should be made to another org's peer. Once the local org's peers have caught up in
310+
// block height, the connection to the other org's peer should be terminated.
311+
func TestPreferLocalOrgConnection(t *testing.T) {
312+
channelID := "testchannel"
313+
org1MSP := "Org1MSP"
314+
org2MSP := "Org2MSP"
315+
316+
p1O1 := clientmocks.NewMockStatefulPeer("p1_o1", "peer1.org1.com:7051", clientmocks.WithMSP(org1MSP), clientmocks.WithBlockHeight(4))
317+
p2O1 := clientmocks.NewMockStatefulPeer("p2_o1", "peer2.org1.com:7051", clientmocks.WithMSP(org1MSP), clientmocks.WithBlockHeight(3))
318+
p1O2 := clientmocks.NewMockStatefulPeer("p1_o2", "peer1.org2.com:7051", clientmocks.WithMSP(org2MSP), clientmocks.WithBlockHeight(10))
319+
p2O2 := clientmocks.NewMockStatefulPeer("p2_o2", "peer1.org2.com:7051", clientmocks.WithMSP(org2MSP), clientmocks.WithBlockHeight(12))
320+
321+
dispatcher := New(
322+
fabmocks.NewMockContext(
323+
mspmocks.NewMockSigningIdentity("user1", org1MSP),
324+
),
325+
fabmocks.NewMockChannelCfg(channelID),
326+
clientmocks.NewDiscoveryService(p1O1, p2O1, p1O2, p2O2),
327+
clientmocks.NewProviderFactory().Provider(
328+
clientmocks.NewMockConnection(
329+
clientmocks.WithLedger(
330+
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL),
331+
),
332+
),
333+
),
334+
WithPeerResolver(preferorg.NewResolver()),
335+
WithPeerMonitorPeriod(250*time.Millisecond),
336+
minblockheight.WithBlockHeightLagThreshold(2),
337+
minblockheight.WithReconnectBlockHeightThreshold(3),
338+
WithLoadBalancePolicy(lbp.NewRandom()),
339+
)
340+
341+
if err := dispatcher.Start(); err != nil {
342+
t.Fatalf("Error starting dispatcher: %s", err)
343+
}
344+
345+
dispatcherEventch, err := dispatcher.EventCh()
346+
if err != nil {
347+
t.Fatalf("Error getting event channel from dispatcher: %s", err)
348+
}
349+
350+
// Register for connection events
351+
regerrch := make(chan error)
352+
regch := make(chan fab.Registration)
353+
connch := make(chan *ConnectionEvent, 10)
354+
dispatcherEventch <- NewRegisterConnectionEvent(connch, regch, regerrch)
355+
356+
select {
357+
case <-regch:
358+
// No need get the registration to unregister since we're relying on the
359+
// connch channel being closed when the dispatcher is stopped.
360+
case err := <-regerrch:
361+
t.Fatalf("Error registering for connection events: %s", err)
362+
}
363+
364+
// Connect
365+
errch := make(chan error)
366+
dispatcherEventch <- NewConnectEvent(errch)
367+
err = <-errch
368+
if err != nil {
369+
t.Fatalf("Error connecting: %s", err)
370+
}
371+
372+
dispatcherEventch <- NewConnectedEvent()
373+
374+
select {
375+
case e := <-connch:
376+
assert.Truef(t, e.Connected, "expecting connected event")
377+
case <-time.After(time.Second):
378+
t.Fatal("Expecting connected event but got none")
379+
}
380+
381+
// The initial connection should have been to an Org2 peer since their block heights are higher than Org1
382+
blockProducer := servicemocks.NewBlockProducer()
383+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
384+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
385+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
386+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
387+
dispatcherEventch <- esdispatcher.NewBlockEvent(blockProducer.NewBlock(channelID), sourceURL)
388+
389+
p2O1.SetBlockHeight(15)
390+
391+
select {
392+
case e := <-connch:
393+
assert.Falsef(t, e.Connected, "expecting disconnected event")
394+
case <-time.After(time.Second):
395+
t.Fatal("Expecting disconnected event but got none")
396+
}
397+
}
398+
307399
func checkEvent(connch chan *ConnectionEvent, errch chan error, state, expectedDisconnectErr string) {
308400
for {
309401
select {

pkg/fab/events/client/mocks/mockpeer.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,49 @@ package mocks
99
import (
1010
"sync"
1111

12-
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
1312
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
1413
)
1514

1615
// MockPeer contains mock PeerState
1716
type MockPeer struct {
18-
fab.Peer
17+
*fabmocks.MockPeer
1918
blockHeight uint64
2019
lock sync.RWMutex
2120
}
2221

22+
// MockPeerOpt is a mock peer option
23+
type MockPeerOpt func(*MockPeer)
24+
25+
// WithMSP sets the MSP ID of the mock peer
26+
func WithMSP(mspID string) MockPeerOpt {
27+
return func(p *MockPeer) {
28+
p.SetMSPID(mspID)
29+
}
30+
}
31+
32+
// WithBlockHeight sets the block height of the mock peer
33+
func WithBlockHeight(blockHeight uint64) MockPeerOpt {
34+
return func(p *MockPeer) {
35+
p.blockHeight = blockHeight
36+
}
37+
}
38+
39+
// NewMockStatefulPeer returns a new MockPeer with the given options
40+
func NewMockStatefulPeer(name, url string, opts ...MockPeerOpt) *MockPeer {
41+
p := &MockPeer{
42+
MockPeer: fabmocks.NewMockPeer(name, url),
43+
}
44+
for _, opt := range opts {
45+
opt(p)
46+
}
47+
return p
48+
}
49+
2350
// NewMockPeer returns a new MockPeer
51+
// Deprecated: This function will be deprecated in the future. Use NewMockStatefulPeer instead.
2452
func NewMockPeer(name, url string, blockHeight uint64) *MockPeer {
2553
return &MockPeer{
26-
Peer: fabmocks.NewMockPeer(name, url),
54+
MockPeer: fabmocks.NewMockPeer(name, url),
2755
blockHeight: blockHeight,
2856
}
2957
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright SecureKey Technologies Inc. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package preferorg
8+
9+
import (
10+
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
11+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"
12+
)
13+
14+
type params struct {
15+
loadBalancePolicy lbp.LoadBalancePolicy
16+
}
17+
18+
func defaultParams(context context.Client) *params {
19+
return &params{
20+
loadBalancePolicy: lbp.NewRandom(),
21+
}
22+
}
23+
24+
func (p *params) SetLoadBalancePolicy(value lbp.LoadBalancePolicy) {
25+
logger.Debugf("LoadBalancePolicy: %#v", value)
26+
p.loadBalancePolicy = value
27+
}

0 commit comments

Comments
 (0)