Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 6e67766

Browse files
committed
[FAB-8813] Pass GRPC options to event client
Ensure that all GRPC options are passed to the event clients. Change-Id: I7dcbd9ac6b3f0f2a8cd8e1e487ce33806f47cada Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent 37cac30 commit 6e67766

File tree

10 files changed

+371
-130
lines changed

10 files changed

+371
-130
lines changed

pkg/core/config/config.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -919,18 +919,28 @@ func (c *Config) PeerConfigByURL(url string) (*core.PeerConfig, error) {
919919
if err != nil {
920920
return nil, err
921921
}
922+
923+
var matchPeerConfig *core.PeerConfig
922924
staticPeers := config.Peers
923925
for _, staticPeerConfig := range staticPeers {
924926
if strings.EqualFold(staticPeerConfig.URL, url) {
925-
return &staticPeerConfig, nil
927+
matchPeerConfig = &staticPeerConfig
928+
break
926929
}
927930
}
928931

929-
// try to match from entity matchers
930-
matchPeerConfig, err := c.tryMatchingPeerConfig(url)
931-
if err != nil {
932-
return nil, errors.WithMessage(err, "No Peer found with the url from config")
932+
if matchPeerConfig == nil {
933+
// try to match from entity matchers
934+
matchPeerConfig, err = c.tryMatchingPeerConfig(url)
935+
if err != nil {
936+
return nil, errors.WithMessage(err, "No Peer found with the url from config")
937+
}
938+
}
939+
940+
if matchPeerConfig != nil && matchPeerConfig.TLSCACerts.Path != "" {
941+
matchPeerConfig.TLSCACerts.Path = SubstPathVars(matchPeerConfig.TLSCACerts.Path)
933942
}
943+
934944
return matchPeerConfig, nil
935945
}
936946

pkg/fab/events/api/endpoint.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package api
88

99
import (
10+
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
1011
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
1112
)
1213

@@ -17,4 +18,7 @@ type EventEndpoint interface {
1718

1819
// EventURL returns the event URL
1920
EventURL() string
21+
22+
// Opts returns additional options for the connection
23+
Opts() []options.Opt
2024
}

pkg/fab/events/deliverclient/deliverclient.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
deliverconn "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/connection"
2020
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/dispatcher"
2121
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
22+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint"
2223
"github.com/hyperledger/fabric-sdk-go/pkg/logging"
2324
"github.com/pkg/errors"
2425
)
@@ -27,12 +28,20 @@ var logger = logging.NewLogger("fabsdk/fab")
2728

2829
// deliverProvider is the connection provider used for connecting to the Deliver service
2930
var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
30-
return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL())
31+
eventEndpoint, ok := peer.(api.EventEndpoint)
32+
if !ok {
33+
panic("peer is not an EventEndpoint")
34+
}
35+
return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...)
3136
}
3237

3338
// deliverFilteredProvider is the connection provider used for connecting to the DeliverFiltered service
3439
var deliverFilteredProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
35-
return deliverconn.New(context, chConfig, deliverconn.DeliverFiltered, peer.URL())
40+
eventEndpoint, ok := peer.(api.EventEndpoint)
41+
if !ok {
42+
panic("peer is not an EventEndpoint")
43+
}
44+
return deliverconn.New(context, chConfig, deliverconn.DeliverFiltered, peer.URL(), eventEndpoint.Opts()...)
3645
}
3746

3847
// Client connects to a peer and receives channel events, such as bock, filtered block, chaincode, and transaction status events.
@@ -46,10 +55,14 @@ func New(context fabcontext.Client, chConfig fab.ChannelCfg, opts ...options.Opt
4655
params := defaultParams()
4756
options.Apply(params, opts)
4857

58+
// Use a context that returns a custom Discovery Provider which
59+
// produces event endpoints containing additional GRPC options.
60+
deliverCtx := newDeliverContext(context)
61+
4962
client := &Client{
5063
Client: *client.New(
5164
params.permitBlockEvents,
52-
dispatcher.New(context, chConfig, params.connProvider, opts...),
65+
dispatcher.New(deliverCtx, chConfig, params.connProvider, opts...),
5366
opts...,
5467
),
5568
params: *params,
@@ -121,3 +134,20 @@ func (c *Client) seekInfo() (*ab.SeekInfo, error) {
121134
return nil, errors.Errorf("unsupported seek type:[%s]", c.seekType)
122135
}
123136
}
137+
138+
// deliverContext overrides the DiscoveryProvider
139+
type deliverContext struct {
140+
fabcontext.Client
141+
}
142+
143+
func newDeliverContext(ctx fabcontext.Client) fabcontext.Client {
144+
return &deliverContext{
145+
Client: ctx,
146+
}
147+
}
148+
149+
// DiscoveryProvider returns a custom discovery provider which produces
150+
// event endpoints with additional GRPC options
151+
func (ctx *deliverContext) DiscoveryProvider() fab.DiscoveryProvider {
152+
return endpoint.NewDiscoveryProvider(ctx.Client)
153+
}

pkg/fab/events/deliverclient/deliverclient_test.go

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
14+
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
1415
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
1516
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
1617
clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher"
@@ -33,17 +34,14 @@ const (
3334
var (
3435
defaultOpts = []options.Opt{}
3536

36-
peer1 = fabclientmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051")
37-
peer2 = fabclientmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051")
37+
peer1 = fabclientmocks.NewMockPeer("peer1", "peer1.example.com:7051")
38+
peer2 = fabclientmocks.NewMockPeer("peer2", "peer2.example.com:7051")
3839
)
3940

4041
func TestOptionsInNewClient(t *testing.T) {
4142
channelID := "mychannel"
4243
client, err := New(
43-
fabmocks.NewMockContextWithCustomDiscovery(
44-
fabmocks.NewMockUser("user1"),
45-
clientmocks.NewDiscoveryProvider(peer1, peer2),
46-
),
44+
newMockContext(),
4745
fabmocks.NewMockChannelCfg(channelID),
4846
WithBlockEvents(),
4947
)
@@ -56,10 +54,7 @@ func TestOptionsInNewClient(t *testing.T) {
5654
func TestClientConnect(t *testing.T) {
5755
channelID := "mychannel"
5856
eventClient, err := New(
59-
fabmocks.NewMockContextWithCustomDiscovery(
60-
fabmocks.NewMockUser("user1"),
61-
clientmocks.NewDiscoveryProvider(peer1, peer2),
62-
),
57+
newMockContext(),
6358
fabmocks.NewMockChannelCfg(channelID),
6459
withConnectionProvider(
6560
clientmocks.NewProviderFactory().Provider(
@@ -187,10 +182,7 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo
187182

188183
channelID := "mychannel"
189184
eventClient, err := New(
190-
fabmocks.NewMockContextWithCustomDiscovery(
191-
fabmocks.NewMockUser("user1"),
192-
clientmocks.NewDiscoveryProvider(peer1, peer2),
193-
),
185+
newMockContext(),
194186
fabmocks.NewMockChannelCfg(channelID),
195187
withConnectionProvider(
196188
cp.FlakeyProvider(
@@ -230,10 +222,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
230222

231223
channelID := "mychannel"
232224
eventClient, err := New(
233-
fabmocks.NewMockContextWithCustomDiscovery(
234-
fabmocks.NewMockUser("user1"),
235-
clientmocks.NewDiscoveryProvider(peer1, peer2),
236-
),
225+
newMockContext(),
237226
fabmocks.NewMockChannelCfg(channelID),
238227
withConnectionProvider(
239228
cp.FlakeyProvider(
@@ -303,10 +292,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA
303292
cp := clientmocks.NewProviderFactory()
304293

305294
eventClient, err := New(
306-
fabmocks.NewMockContextWithCustomDiscovery(
307-
fabmocks.NewMockUser("user1"),
308-
clientmocks.NewDiscoveryProvider(peer1, peer2),
309-
),
295+
newMockContext(),
310296
fabmocks.NewMockChannelCfg(channelID),
311297
withConnectionProvider(
312298
cp.FlakeyProvider(
@@ -451,3 +437,26 @@ func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, wait
451437
}
452438
}
453439
}
440+
441+
type mockConfig struct {
442+
core.Config
443+
}
444+
445+
func newMockConfig() *mockConfig {
446+
return &mockConfig{
447+
Config: fabmocks.NewMockConfig(),
448+
}
449+
}
450+
451+
func (c *mockConfig) PeerConfigByURL(url string) (*core.PeerConfig, error) {
452+
return &core.PeerConfig{}, nil
453+
}
454+
455+
func newMockContext() *fabmocks.MockContext {
456+
ctx := fabmocks.NewMockContextWithCustomDiscovery(
457+
fabmocks.NewMockUser("user1"),
458+
clientmocks.NewDiscoveryProvider(peer1, peer2),
459+
)
460+
ctx.SetConfig(newMockConfig())
461+
return ctx
462+
}

pkg/fab/events/endpoint/endpoint.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"crypto/x509"
1111
"time"
1212

13+
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
14+
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
15+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/comm"
16+
1317
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
1418
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
15-
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
16-
"github.com/hyperledger/fabric-sdk-go/pkg/fab/peer"
1719
"github.com/spf13/cast"
1820
"google.golang.org/grpc/keepalive"
1921
)
@@ -29,20 +31,31 @@ type EventEndpoint struct {
2931
KeepAliveParams keepalive.ClientParameters
3032
FailFast bool
3133
ConnectTimeout time.Duration
34+
AllowInsecure bool
3235
}
3336

3437
// EventURL returns the event URL
3538
func (e *EventEndpoint) EventURL() string {
3639
return e.EvtURL
3740
}
3841

39-
// FromPeerConfig creates a new EventEndpoint from the given config
40-
func FromPeerConfig(config core.Config, peerCfg core.NetworkPeer) (*EventEndpoint, error) {
41-
p, err := peer.New(config, peer.FromPeerConfig(&peerCfg))
42-
if err != nil {
43-
return nil, err
42+
// Opts returns additional options for the event connection
43+
func (e *EventEndpoint) Opts() []options.Opt {
44+
opts := []options.Opt{
45+
comm.WithHostOverride(e.HostOverride),
46+
comm.WithFailFast(e.FailFast),
47+
comm.WithKeepAliveParams(e.KeepAliveParams),
48+
comm.WithCertificate(e.Certificate),
49+
comm.WithConnectTimeout(e.ConnectTimeout),
50+
}
51+
if e.AllowInsecure {
52+
opts = append(opts, comm.WithInsecure())
4453
}
54+
return opts
55+
}
4556

57+
// FromPeerConfig creates a new EventEndpoint from the given config
58+
func FromPeerConfig(config core.Config, peer fab.Peer, peerCfg *core.PeerConfig) (*EventEndpoint, error) {
4659
certificate, err := peerCfg.TLSCACerts.TLSCert()
4760
if err != nil {
4861
//Ignore empty cert errors,
@@ -53,31 +66,32 @@ func FromPeerConfig(config core.Config, peerCfg core.NetworkPeer) (*EventEndpoin
5366
}
5467

5568
return &EventEndpoint{
56-
Peer: p,
69+
Peer: peer,
5770
EvtURL: peerCfg.EventURL,
5871
HostOverride: getServerNameOverride(peerCfg),
5972
Certificate: certificate,
6073
KeepAliveParams: getKeepAliveOptions(peerCfg),
6174
FailFast: getFailFast(peerCfg),
6275
ConnectTimeout: config.TimeoutOrDefault(core.EventHubConnection),
76+
AllowInsecure: isInsecureAllowed(peerCfg),
6377
}, nil
6478
}
6579

66-
func getServerNameOverride(peerCfg core.NetworkPeer) string {
80+
func getServerNameOverride(peerCfg *core.PeerConfig) string {
6781
if str, ok := peerCfg.GRPCOptions["ssl-target-name-override"].(string); ok {
6882
return str
6983
}
7084
return ""
7185
}
7286

73-
func getFailFast(peerCfg core.NetworkPeer) bool {
87+
func getFailFast(peerCfg *core.PeerConfig) bool {
7488
if ff, ok := peerCfg.GRPCOptions["fail-fast"].(bool); ok {
7589
return cast.ToBool(ff)
7690
}
7791
return false
7892
}
7993

80-
func getKeepAliveOptions(peerCfg core.NetworkPeer) keepalive.ClientParameters {
94+
func getKeepAliveOptions(peerCfg *core.PeerConfig) keepalive.ClientParameters {
8195
var kap keepalive.ClientParameters
8296
if kaTime, ok := peerCfg.GRPCOptions["keep-alive-time"]; ok {
8397
kap.Time = cast.ToDuration(kaTime)
@@ -90,3 +104,11 @@ func getKeepAliveOptions(peerCfg core.NetworkPeer) keepalive.ClientParameters {
90104
}
91105
return kap
92106
}
107+
108+
func isInsecureAllowed(peerCfg *core.PeerConfig) bool {
109+
allowInsecure, ok := peerCfg.GRPCOptions["allow-insecure"].(bool)
110+
if ok {
111+
return allowInsecure
112+
}
113+
return false
114+
}

0 commit comments

Comments
 (0)