Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 79b343b

Browse files
committed
[FAB-8995] Event client should read channel config
The event endpoint discovery service now reads the channel peer config. If a channel peer config is present for a given peer URL and eventSource==false then the peer will be excluded. Change-Id: I7b6538d21b79e3a44acafca2306e52c726a540f5 Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent de1f456 commit 79b343b

File tree

4 files changed

+118
-18
lines changed

4 files changed

+118
-18
lines changed

pkg/fab/events/deliverclient/connection/connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ var (
5959

6060
// New returns a new Deliver Server connection
6161
func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*DeliverConnection, error) {
62+
logger.Debugf("Connecting to %s...", url)
6263
connect, err := comm.NewConnection(
6364
ctx, chConfig,
6465
func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {

pkg/fab/events/endpoint/endpoint_test.go

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package endpoint
88

99
import (
10+
"fmt"
1011
"testing"
1112
"time"
1213

@@ -16,6 +17,23 @@ import (
1617
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
1718
)
1819

20+
const (
21+
url1 = "p1.test.com:9051"
22+
url2 = "p2.test.com:9051"
23+
url3 = "p3.test.com:9051"
24+
)
25+
26+
var p1 = fabmocks.NewMockPeer("p1", url1)
27+
var p2 = fabmocks.NewMockPeer("p2", url2)
28+
var p3 = fabmocks.NewMockPeer("p3", url3)
29+
30+
var pc1 = core.PeerConfig{URL: url1}
31+
var pc2 = core.PeerConfig{URL: url2}
32+
var pc3 = core.PeerConfig{URL: url3}
33+
34+
var peers = []fab.Peer{p1, p2, p3}
35+
var peerConfigs = []core.PeerConfig{pc1, pc2, pc3}
36+
1937
func TestEndpoint(t *testing.T) {
2038
expectedEventURL := "localhost:7053"
2139
expectedAllowInsecure := true
@@ -69,71 +87,119 @@ func TestEndpoint(t *testing.T) {
6987

7088
func TestDiscoveryProvider(t *testing.T) {
7189
ctx := newMockContext()
90+
91+
expectedNumPeers := len(peers)
92+
7293
discoveryProvider := NewDiscoveryProvider(ctx)
7394

7495
discoveryService, err := discoveryProvider.CreateDiscoveryService("testchannel")
7596
if err != nil {
7697
t.Fatalf("error creating discovery service: %s", err)
7798
}
78-
_, err = discoveryService.GetPeers()
99+
peers, err = discoveryService.GetPeers()
79100
if err != nil {
80101
t.Fatalf("error getting peers: %s", err)
81102
}
82-
103+
if len(peers) != expectedNumPeers {
104+
t.Fatalf("expecting %d peers but got %d", expectedNumPeers, len(peers))
105+
}
83106
}
84107

85108
func TestDiscoveryProviderWithTargetFilter(t *testing.T) {
86109
ctx := newMockContext()
87110

88-
var numTimesCalled int
89-
expectedNumTimesCalled := 1
111+
expectedNumPeers := len(peers) - 1
112+
113+
discoveryProvider := NewDiscoveryProvider(ctx, WithTargetFilter(newMockFilter(p3)))
114+
115+
discoveryService, err := discoveryProvider.CreateDiscoveryService("testchannel")
116+
if err != nil {
117+
t.Fatalf("error creating discovery service: %s", err)
118+
}
119+
peers, err = discoveryService.GetPeers()
120+
if err != nil {
121+
t.Fatalf("error getting peers: %s", err)
122+
}
123+
if len(peers) != expectedNumPeers {
124+
t.Fatalf("expecting %d peers but got %d", expectedNumPeers, len(peers))
125+
}
126+
}
127+
128+
func TestDiscoveryProviderWithEventSource(t *testing.T) {
129+
ctx := newMockContext()
130+
131+
chPeer2 := core.ChannelPeer{}
132+
chPeer2.URL = p2.URL()
133+
chPeer2.EventSource = false
134+
ctx.SetConfig(newMockConfig(chPeer2))
90135

91-
discoveryProvider := NewDiscoveryProvider(ctx, WithTargetFilter(newMockFilter(&numTimesCalled)))
136+
expectedNumPeers := len(peers) - 1
137+
138+
discoveryProvider := NewDiscoveryProvider(ctx)
92139

93140
discoveryService, err := discoveryProvider.CreateDiscoveryService("testchannel")
94141
if err != nil {
95142
t.Fatalf("error creating discovery service: %s", err)
96143
}
97-
_, err = discoveryService.GetPeers()
144+
peers, err = discoveryService.GetPeers()
98145
if err != nil {
99146
t.Fatalf("error getting peers: %s", err)
100147
}
101-
if numTimesCalled != expectedNumTimesCalled {
102-
t.Fatalf("expecting target filter to be called %d time(s) but was called %d time(s)", expectedNumTimesCalled, numTimesCalled)
148+
if len(peers) != expectedNumPeers {
149+
t.Fatalf("expecting %d peers but got %d", expectedNumPeers, len(peers))
103150
}
104151
}
105152

106153
type mockConfig struct {
107154
core.Config
155+
channelPeers []core.ChannelPeer
108156
}
109157

110-
func newMockConfig() *mockConfig {
158+
func newMockConfig(channelPeers ...core.ChannelPeer) *mockConfig {
111159
return &mockConfig{
112-
Config: fabmocks.NewMockConfig(),
160+
Config: fabmocks.NewMockConfig(),
161+
channelPeers: channelPeers,
113162
}
114163
}
115164

116165
func (c *mockConfig) PeerConfigByURL(url string) (*core.PeerConfig, error) {
117-
return &core.PeerConfig{}, nil
166+
for _, pc := range peerConfigs {
167+
if pc.URL == url {
168+
return &pc, nil
169+
}
170+
}
171+
return nil, nil
172+
}
173+
174+
func (c *mockConfig) ChannelPeers(name string) ([]core.ChannelPeer, error) {
175+
fmt.Printf("mockConfig.ChannelPeers - returning %#v", c.channelPeers)
176+
return c.channelPeers, nil
118177
}
119178

120179
func newMockContext() *fabmocks.MockContext {
121-
ctx := fabmocks.NewMockContext(
180+
discoveryProvider, _ := fabmocks.NewMockDiscoveryProvider(nil, peers)
181+
182+
ctx := fabmocks.NewMockContextWithCustomDiscovery(
122183
mspmocks.NewMockSigningIdentity("user1", "Org1MSP"),
184+
discoveryProvider,
123185
)
124186
ctx.SetConfig(newMockConfig())
125187
return ctx
126188
}
127189

128190
type mockFilter struct {
129-
numTimesCalled *int
191+
excludePeers []fab.Peer
130192
}
131193

132-
func newMockFilter(numTimesCalled *int) *mockFilter {
133-
return &mockFilter{numTimesCalled: numTimesCalled}
194+
func newMockFilter(excludePeers ...fab.Peer) *mockFilter {
195+
return &mockFilter{excludePeers: excludePeers}
134196
}
135197

136198
func (f *mockFilter) Accept(peer fab.Peer) bool {
137-
*f.numTimesCalled++
199+
for _, p := range f.excludePeers {
200+
if p.URL() == peer.URL() {
201+
return false
202+
}
203+
}
138204
return true
139205
}

pkg/fab/events/endpoint/endpointdiscovery.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@ package endpoint
88

99
import (
1010
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery"
11+
"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
1112
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
13+
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core"
1214
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
1315
"github.com/pkg/errors"
1416
)
1517

18+
var logger = logging.NewLogger("fabsdk/fab")
19+
1620
// DiscoveryProvider is a wrapper around a discovery provider that
1721
// converts each peer into an EventEndpoint. The EventEndpoint
1822
// provides additional connection options.
@@ -57,15 +61,24 @@ func (p *DiscoveryProvider) CreateDiscoveryService(channelID string) (fab.Discov
5761
target = discovery.NewDiscoveryFilterService(target, p.filter)
5862
}
5963

64+
chpeers, err := p.ctx.Config().ChannelPeers(channelID)
65+
if err != nil {
66+
return nil, errors.Wrapf(err, "unable to get channel peers for channel [%s]", channelID)
67+
}
68+
6069
return &discoveryService{
6170
DiscoveryService: target,
6271
ctx: p.ctx,
72+
channelID: channelID,
73+
chPeers: chpeers,
6374
}, nil
6475
}
6576

6677
type discoveryService struct {
6778
fab.DiscoveryService
68-
ctx context.Client
79+
ctx context.Client
80+
channelID string
81+
chPeers []core.ChannelPeer
6982
}
7083

7184
func (s *discoveryService) GetPeers() ([]fab.Peer, error) {
@@ -85,6 +98,15 @@ func (s *discoveryService) GetPeers() ([]fab.Peer, error) {
8598
return nil, errors.Errorf("unable to get peer config from [%s]", peer.URL())
8699
}
87100

101+
chPeer := s.getChannelPeer(peerConfig)
102+
103+
logger.Debugf("Channel peer config for [%s]: %#v", peer.URL(), chPeer)
104+
105+
if chPeer != nil && !chPeer.EventSource {
106+
logger.Debugf("Excluding peer [%s] since it is not configured as an event source", peer.URL())
107+
continue
108+
}
109+
88110
eventEndpoint, err := FromPeerConfig(s.ctx.Config(), peer, peerConfig)
89111
if err != nil {
90112
return nil, errors.Wrapf(err, "unable to create event endpoint for [%s]", peer.URL())
@@ -94,3 +116,12 @@ func (s *discoveryService) GetPeers() ([]fab.Peer, error) {
94116

95117
return eventEndpoints, nil
96118
}
119+
120+
func (s *discoveryService) getChannelPeer(peerConfig *core.PeerConfig) *core.ChannelPeer {
121+
for _, chpeer := range s.chPeers {
122+
if chpeer.URL == peerConfig.URL {
123+
return &chpeer
124+
}
125+
}
126+
return nil
127+
}

test/fixtures/config/config_revoke_test.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ channels:
170170
endorsingPeer: true
171171
chaincodeQuery: true
172172
ledgerQuery: true
173-
eventSource: true
173+
# Don't use revoked peer as event source or
174+
# else the test will time out waiting for event
175+
eventSource: false
174176

175177

176178
#

0 commit comments

Comments
 (0)