Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit fa73e44

Browse files
committed
[FABG-699] Add SDK config for reconnecting event client
The options for automatically reconnecting to peers based on block height are now exposed in the SDK config. Change-Id: Idfd4a3a1bc733e086beef3d7b66a75cb9cc625a0 Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent bf173b4 commit fa73e44

File tree

14 files changed

+283
-66
lines changed

14 files changed

+283
-66
lines changed

pkg/common/providers/fab/provider.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,35 @@ type EndpointConfig interface {
8888
ChannelPeers(name string) ([]ChannelPeer, bool)
8989
ChannelOrderers(name string) ([]OrdererConfig, bool)
9090
TLSCACertPool() CertPool
91-
EventServiceType() EventServiceType
91+
EventServiceConfig() EventServiceConfig
9292
TLSClientCerts() []tls.Certificate
9393
CryptoConfigPath() string
9494
}
9595

96+
// EventServiceConfig specifies configuration options for the event service
97+
type EventServiceConfig interface {
98+
// Type returns the type of event service to use
99+
Type() EventServiceType
100+
101+
// BlockHeightLagThreshold returns the block height lag threshold. This value is used for choosing a peer
102+
// to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of
103+
// blocks then it will be excluded from selection.
104+
// If set to 0 then only the most up-to-date peers are considered.
105+
// If set to -1 then all peers (regardless of block height) are considered for selection.
106+
BlockHeightLagThreshold() int
107+
108+
// ReconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
109+
// block height falls behind the specified number of blocks and will reconnect to a better performing peer.
110+
// If set to 0 (default) then the peer will not disconnect based on block height.
111+
// NOTE: Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
112+
// affecting performance.
113+
ReconnectBlockHeightLagThreshold() int
114+
115+
// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
116+
// value is only relevant if reconnectBlockHeightLagThreshold >0.
117+
BlockHeightMonitorPeriod() time.Duration
118+
}
119+
96120
// TimeoutType enumerates the different types of outgoing connections
97121
type TimeoutType int
98122

pkg/common/providers/test/mockfab/mockfab.gen.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/core/config/testdata/template/config.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,30 @@ client:
4949
# # Event service type (optional). If not specified then the type is automatically
5050
# # determined from channel capabilities.
5151
# type: (deliver|eventhub)
52+
#
53+
# # blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer
54+
# # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of
55+
# # blocks then it will be excluded from selection.
56+
# # If set to 0 then only the most up-to-date peers are considered.
57+
# # If set to -1 then all peers (regardless of block height) are considered for selection.
58+
# # Default: 5
59+
# blockHeightLagThreshold: 5
60+
#
61+
# # reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
62+
# # block height falls behind the specified number of blocks and will reconnect to a better performing peer.
63+
# # If set to 0 then this feature is disabled.
64+
# # Default: 0 (disabled)
65+
# # NOTES:
66+
# # - This feature should only be enabled when using deliver events, otherwise events may be lost
67+
# # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
68+
# # affecting performance.
69+
# reconnectBlockHeightLagThreshold: 0
70+
#
71+
# # blockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
72+
# # value is only relevant if reconnectBlockHeightLagThreshold >0.
73+
# # Default: 5s
74+
# blockHeightMonitorPeriod: 5s
75+
5276
# the below timeouts are commented out to use the default values that are found in
5377
# "pkg/fab/endpointconfig.go"
5478
# the client is free to override the default values by uncommenting and resetting

pkg/fab/endpointconfig.go

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"crypto/x509"
1212
"reflect"
1313
"regexp"
14+
"strconv"
1415
"strings"
1516
"time"
1617

@@ -50,9 +51,12 @@ const (
5051
defaultChannelConfigRefreshInterval = time.Second * 90
5152
defaultChannelMemshpRefreshInterval = time.Second * 60
5253
defaultDiscoveryRefreshInterval = time.Second * 5
53-
defaultSelectionRefreshInterval = time.Minute * 1
54+
defaultSelectionRefreshInterval = time.Second * 5
5455
defaultCacheSweepInterval = time.Second * 15
5556

57+
defaultBlockHeightLagThreshold = 5
58+
defaultBlockHeightMonitorPeriod = 5 * time.Second
59+
5660
//default grpc opts
5761
defaultKeepAliveTime = 0
5862
defaultKeepAliveTimeout = time.Second * 20
@@ -232,17 +236,9 @@ func (c *EndpointConfig) TLSCACertPool() fab.CertPool {
232236
return c.tlsCertPool
233237
}
234238

235-
// EventServiceType returns the type of event service client to use
236-
func (c *EndpointConfig) EventServiceType() fab.EventServiceType {
237-
etype := c.backend.GetString("client.eventService.type")
238-
switch etype {
239-
case "eventhub":
240-
return fab.EventHubEventServiceType
241-
case "deliver":
242-
return fab.DeliverEventServiceType
243-
default:
244-
return fab.AutoDetectEventServiceType
245-
}
239+
// EventServiceConfig returns the event service config
240+
func (c *EndpointConfig) EventServiceConfig() fab.EventServiceConfig {
241+
return &EventServiceConfig{backend: c.backend}
246242
}
247243

248244
// TLSClientCerts loads the client's certs for mutual TLS
@@ -1446,6 +1442,61 @@ func (c *EndpointConfig) regexMatchAndReplace(regex *regexp.Regexp, src, repl st
14461442
return repl
14471443
}
14481444

1445+
// EventServiceConfig contains config options for the event service
1446+
type EventServiceConfig struct {
1447+
backend *lookup.ConfigLookup
1448+
}
1449+
1450+
// Type returns the type of event service to use
1451+
func (c *EventServiceConfig) Type() fab.EventServiceType {
1452+
etype := c.backend.GetString("client.eventService.type")
1453+
switch etype {
1454+
case "eventhub":
1455+
return fab.EventHubEventServiceType
1456+
case "deliver":
1457+
return fab.DeliverEventServiceType
1458+
default:
1459+
return fab.AutoDetectEventServiceType
1460+
}
1461+
}
1462+
1463+
// BlockHeightLagThreshold returns the block height lag threshold. This value is used for choosing a peer
1464+
// to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of
1465+
// blocks then it will be excluded from selection.
1466+
// If set to 0 then only the most up-to-date peers are considered.
1467+
// If set to -1 then all peers (regardless of block height) are considered for selection.
1468+
func (c *EventServiceConfig) BlockHeightLagThreshold() int {
1469+
lagThresholdStr := c.backend.GetString("client.eventService.blockHeightLagThreshold")
1470+
if lagThresholdStr == "" {
1471+
return defaultBlockHeightLagThreshold
1472+
}
1473+
lagThreshold, err := strconv.Atoi(lagThresholdStr)
1474+
if err != nil {
1475+
logger.Warnf("Invalid numeric value for client.eventService.blockHeightLagThreshold. Setting to default value of %d", defaultBlockHeightLagThreshold)
1476+
return defaultBlockHeightLagThreshold
1477+
}
1478+
return lagThreshold
1479+
}
1480+
1481+
// ReconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
1482+
// block height falls behind the specified number of blocks and will reconnect to a better performing peer.
1483+
// If set to 0 then this feature is disabled.
1484+
// NOTE: Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
1485+
// affecting performance.
1486+
func (c *EventServiceConfig) ReconnectBlockHeightLagThreshold() int {
1487+
return c.backend.GetInt("client.eventService.reconnectBlockHeightLagThreshold")
1488+
}
1489+
1490+
// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
1491+
// value is only relevant if reconnectBlockHeightLagThreshold >0.
1492+
func (c *EventServiceConfig) BlockHeightMonitorPeriod() time.Duration {
1493+
period := c.backend.GetDuration("client.eventService.blockHeightMonitorPeriod")
1494+
if period == 0 {
1495+
return defaultBlockHeightMonitorPeriod
1496+
}
1497+
return period
1498+
}
1499+
14491500
//peerChannelConfigHookFunc returns hook function for unmarshalling 'fab.PeerChannelConfig'
14501501
// Rule : default set to 'true' if not provided in config
14511502
func peerChannelConfigHookFunc() mapstructure.DecodeHookFunc {

pkg/fab/endpointconfig_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"crypto/x509"
1212
"testing"
1313

14+
"github.com/stretchr/testify/require"
15+
1416
"os"
1517

1618
"fmt"
@@ -200,6 +202,23 @@ func TestTimeouts(t *testing.T) {
200202
checkTimeouts(endpointConfig, t, errStr)
201203
}
202204

205+
func TestEventServiceConfig(t *testing.T) {
206+
customBackend := getCustomBackend()
207+
customBackend.KeyValueMap["client.eventService.type"] = "deliver"
208+
customBackend.KeyValueMap["client.eventService.blockHeightLagThreshold"] = "4"
209+
customBackend.KeyValueMap["client.eventService.reconnectBlockHeightLagThreshold"] = "7"
210+
customBackend.KeyValueMap["client.eventService.blockHeightMonitorPeriod"] = "7s"
211+
212+
endpointConfig, err := ConfigFromBackend(customBackend)
213+
require.NoError(t, err)
214+
215+
eventServiceConfig := endpointConfig.EventServiceConfig()
216+
assert.Equalf(t, fab.DeliverEventServiceType, eventServiceConfig.Type(), "invalid value for type")
217+
assert.Equalf(t, 4, eventServiceConfig.BlockHeightLagThreshold(), "invalid value for blockHeightLagThreshold")
218+
assert.Equalf(t, 7, eventServiceConfig.ReconnectBlockHeightLagThreshold(), "invalid value for reconnectBlockHeightLagThreshold")
219+
assert.Equalf(t, 7*time.Second, eventServiceConfig.BlockHeightMonitorPeriod(), "invalid value for blockHeightMonitorPeriod")
220+
}
221+
203222
func checkTimeouts(endpointConfig fab.EndpointConfig, t *testing.T, errStr string) {
204223
t1 := endpointConfig.Timeout(fab.OrdererResponse)
205224
assert.Equal(t, time.Second*6, t1, "OrdererResponse")

pkg/fab/events/client/dispatcher/dispatcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Dispatcher struct {
4141

4242
// New creates a new dispatcher
4343
func New(context context.Client, chConfig fab.ChannelCfg, discoveryService fab.DiscoveryService, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher {
44-
params := defaultParams()
44+
params := defaultParams(context.EndpointConfig().EventServiceConfig())
4545
options.Apply(params, opts)
4646

4747
return &Dispatcher{

pkg/fab/events/client/dispatcher/opts.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
13+
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
1314
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"
1415
)
1516

@@ -20,12 +21,12 @@ type params struct {
2021
reconnectBlockHeightLagThreshold int
2122
}
2223

23-
func defaultParams() *params {
24+
func defaultParams(config fab.EventServiceConfig) *params {
2425
return &params{
2526
loadBalancePolicy: lbp.NewRoundRobin(),
26-
blockHeightLagThreshold: 5, // TODO: Use defaults from SDK config
27-
reconnectBlockHeightLagThreshold: 0, // TODO: Use defaults from SDK config
28-
blockHeightMonitorPeriod: 5 * time.Second, // TODO: Use defaults from SDK config
27+
blockHeightMonitorPeriod: config.BlockHeightMonitorPeriod(),
28+
blockHeightLagThreshold: config.BlockHeightLagThreshold(),
29+
reconnectBlockHeightLagThreshold: config.ReconnectBlockHeightLagThreshold(),
2930
}
3031
}
3132

pkg/fab/mocks/mockconfig.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type MockConfig struct {
3030
customPeerCfg *fab.PeerConfig
3131
customOrdererCfg *fab.OrdererConfig
3232
customRandomOrdererCfg *fab.OrdererConfig
33-
EvtServiceType fab.EventServiceType
33+
EvtServiceConfig fab.EventServiceConfig
3434
}
3535

3636
// NewMockCryptoConfig ...
@@ -310,9 +310,12 @@ func (c *MockConfig) TLSClientCerts() []tls.Certificate {
310310
return nil
311311
}
312312

313-
// EventServiceType returns the type of event service client to use
314-
func (c *MockConfig) EventServiceType() fab.EventServiceType {
315-
return c.EvtServiceType
313+
// EventServiceConfig returns the type of event service client to use
314+
func (c *MockConfig) EventServiceConfig() fab.EventServiceConfig {
315+
if c.EvtServiceConfig != nil {
316+
return c.EvtServiceConfig
317+
}
318+
return &MockEventServiceConfig{}
316319
}
317320

318321
// Lookup gets the Value from config file by Key
@@ -326,3 +329,32 @@ func (c *MockConfig) Lookup(key string) (interface{}, bool) {
326329
}
327330
return value, true
328331
}
332+
333+
// MockEventServiceConfig contains configuration options for the event service
334+
type MockEventServiceConfig struct {
335+
EvtType fab.EventServiceType
336+
LagThreshold int
337+
ReconnectLagThreshold int
338+
HeightMonitorPeriod time.Duration
339+
}
340+
341+
// Type returns the type of event service to use
342+
func (c *MockEventServiceConfig) Type() fab.EventServiceType {
343+
return c.EvtType
344+
}
345+
346+
// BlockHeightLagThreshold returns the block height lag threshold.
347+
func (c *MockEventServiceConfig) BlockHeightLagThreshold() int {
348+
return c.LagThreshold
349+
}
350+
351+
// ReconnectBlockHeightLagThreshold sets the ReconnectBlockHeightLagThreshold.
352+
func (c *MockEventServiceConfig) ReconnectBlockHeightLagThreshold() int {
353+
return c.ReconnectLagThreshold
354+
}
355+
356+
// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
357+
// value is only relevant if reconnectBlockHeightLagThreshold >0.
358+
func (c *MockEventServiceConfig) BlockHeightMonitorPeriod() time.Duration {
359+
return c.HeightMonitorPeriod
360+
}

pkg/fab/opts.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type EndpointConfigOptions struct {
2828
channelPeers
2929
channelOrderers
3030
tlsCACertPool
31-
eventServiceType
31+
eventServiceConfig
3232
tlsClientCerts
3333
cryptoConfigPath
3434
}
@@ -92,9 +92,9 @@ type tlsCACertPool interface {
9292
TLSCACertPool() fab.CertPool
9393
}
9494

95-
// eventServiceType interface allows to uniquely override EndpointConfig interface's EventServiceType() function
96-
type eventServiceType interface {
97-
EventServiceType() fab.EventServiceType
95+
// eventServiceType interface allows to uniquely override EndpointConfig interface's EventServiceConfig() function
96+
type eventServiceConfig interface {
97+
EventServiceConfig() fab.EventServiceConfig
9898
}
9999

100100
// tlsClientCerts interface allows to uniquely override EndpointConfig interface's TLSClientCerts() function
@@ -139,7 +139,7 @@ func UpdateMissingOptsWithDefaultConfig(c *EndpointConfigOptions, d fab.Endpoint
139139
s.set(c.channelPeers, nil, func() { c.channelPeers = d })
140140
s.set(c.channelOrderers, nil, func() { c.channelOrderers = d })
141141
s.set(c.tlsCACertPool, nil, func() { c.tlsCACertPool = d })
142-
s.set(c.eventServiceType, nil, func() { c.eventServiceType = d })
142+
s.set(c.eventServiceConfig, nil, func() { c.eventServiceConfig = d })
143143
s.set(c.tlsClientCerts, nil, func() { c.tlsClientCerts = d })
144144
s.set(c.cryptoConfigPath, nil, func() { c.cryptoConfigPath = d })
145145

@@ -150,7 +150,7 @@ func UpdateMissingOptsWithDefaultConfig(c *EndpointConfigOptions, d fab.Endpoint
150150
// (ie EndpointConfig interface not fully overridden)
151151
func IsEndpointConfigFullyOverridden(c *EndpointConfigOptions) bool {
152152
return !anyNil(c.timeout, c.orderersConfig, c.ordererConfig, c.peersConfig, c.peerConfig, c.networkConfig,
153-
c.networkPeers, c.channelConfig, c.channelPeers, c.channelOrderers, c.tlsCACertPool, c.eventServiceType, c.tlsClientCerts, c.cryptoConfigPath)
153+
c.networkPeers, c.channelConfig, c.channelPeers, c.channelOrderers, c.tlsCACertPool, c.eventServiceConfig, c.tlsClientCerts, c.cryptoConfigPath)
154154
}
155155

156156
// will override EndpointConfig interface with functions provided by o (option)
@@ -168,7 +168,7 @@ func setEndpointConfigWithOptionInterface(c *EndpointConfigOptions, o interface{
168168
s.set(c.channelPeers, func() bool { _, ok := o.(channelPeers); return ok }, func() { c.channelPeers = o.(channelPeers) })
169169
s.set(c.channelOrderers, func() bool { _, ok := o.(channelOrderers); return ok }, func() { c.channelOrderers = o.(channelOrderers) })
170170
s.set(c.tlsCACertPool, func() bool { _, ok := o.(tlsCACertPool); return ok }, func() { c.tlsCACertPool = o.(tlsCACertPool) })
171-
s.set(c.eventServiceType, func() bool { _, ok := o.(eventServiceType); return ok }, func() { c.eventServiceType = o.(eventServiceType) })
171+
s.set(c.eventServiceConfig, func() bool { _, ok := o.(eventServiceConfig); return ok }, func() { c.eventServiceConfig = o.(eventServiceConfig) })
172172
s.set(c.tlsClientCerts, func() bool { _, ok := o.(tlsClientCerts); return ok }, func() { c.tlsClientCerts = o.(tlsClientCerts) })
173173
s.set(c.cryptoConfigPath, func() bool { _, ok := o.(cryptoConfigPath); return ok }, func() { c.cryptoConfigPath = o.(cryptoConfigPath) })
174174

0 commit comments

Comments
 (0)