Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 6915847

Browse files
committed
[FAB-9554] Dynamic Discovery Provider
Added a Dynamic Discovery Provider that can be plugged into the SDK in order to retrieve peers of a channel from Fabric's discovery service. Change-Id: I2dc12c07276a325e59706ffba175dc99ca663f58 Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent 8b4777e commit 6915847

File tree

10 files changed

+670
-11
lines changed

10 files changed

+670
-11
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
Copyright SecureKey Technologies Inc. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package mocks
8+
9+
import (
10+
reqcontext "context"
11+
"sync"
12+
13+
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
14+
"github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/discovery"
15+
"github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/gossip"
16+
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
17+
fabdiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
18+
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
19+
)
20+
21+
// MockDiscoveryClient implements a mock Discover service
22+
type MockDiscoveryClient struct {
23+
resp []fabdiscovery.Response
24+
lock sync.RWMutex
25+
}
26+
27+
// MockDiscoverEndpointResponse contains a mock response for the discover client
28+
type MockDiscoverEndpointResponse struct {
29+
Target string
30+
PeerEndpoints []*discmocks.MockDiscoveryPeerEndpoint
31+
Error error
32+
}
33+
34+
// NewMockDiscoveryClient returns a new mock Discover service
35+
func NewMockDiscoveryClient() *MockDiscoveryClient {
36+
return &MockDiscoveryClient{}
37+
}
38+
39+
// Send sends a Discovery request
40+
func (m *MockDiscoveryClient) Send(ctx reqcontext.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error) {
41+
return m.responses(), nil
42+
}
43+
44+
// SetResponses sets the responses that the mock client should return from the Send function
45+
func (m *MockDiscoveryClient) SetResponses(responses ...*MockDiscoverEndpointResponse) {
46+
m.lock.Lock()
47+
defer m.lock.Unlock()
48+
49+
m.resp = nil
50+
51+
for _, resp := range responses {
52+
var peers []*discclient.Peer
53+
for _, endpoint := range resp.PeerEndpoints {
54+
peer := &discclient.Peer{
55+
MSPID: endpoint.MSPID,
56+
AliveMessage: newAliveMessage(endpoint),
57+
StateInfoMessage: newStateInfoMessage(endpoint),
58+
}
59+
peers = append(peers, peer)
60+
}
61+
m.resp = append(m.resp, &mockDiscoverResponse{
62+
Response: &response{peers: peers}, target: resp.Target, err: resp.Error,
63+
})
64+
}
65+
}
66+
67+
func (m *MockDiscoveryClient) responses() []fabdiscovery.Response {
68+
m.lock.RLock()
69+
defer m.lock.RUnlock()
70+
return m.resp
71+
}
72+
73+
type mockDiscoverResponse struct {
74+
discclient.Response
75+
target string
76+
err error
77+
}
78+
79+
func (r *mockDiscoverResponse) Target() string {
80+
return r.target
81+
}
82+
83+
func (r *mockDiscoverResponse) Error() error {
84+
return r.err
85+
}
86+
87+
type response struct {
88+
peers []*discclient.Peer
89+
}
90+
91+
func (r *response) ForChannel(string) discclient.ChannelResponse {
92+
return &channelResponse{
93+
peers: r.peers,
94+
}
95+
}
96+
97+
func (r *response) ForLocal() discclient.LocalResponse {
98+
return &localResponse{
99+
peers: r.peers,
100+
}
101+
}
102+
103+
type channelResponse struct {
104+
peers []*discclient.Peer
105+
}
106+
107+
// Config returns a response for a config query, or error if something went wrong
108+
func (cr *channelResponse) Config() (*discovery.ConfigResult, error) {
109+
panic("not implemented")
110+
}
111+
112+
// Peers returns a response for a peer membership query, or error if something went wrong
113+
func (cr *channelResponse) Peers() ([]*discclient.Peer, error) {
114+
return cr.peers, nil
115+
}
116+
117+
// Endorsers returns the response for an endorser query
118+
func (cr *channelResponse) Endorsers(cc string, ps discclient.PrioritySelector, ef discclient.ExclusionFilter) (discclient.Endorsers, error) {
119+
panic("not implemented")
120+
}
121+
122+
type localResponse struct {
123+
peers []*discclient.Peer
124+
}
125+
126+
// Peers returns a response for a peer membership query, or error if something went wrong
127+
func (cr *localResponse) Peers() ([]*discclient.Peer, error) {
128+
return cr.peers, nil
129+
}
130+
131+
func newAliveMessage(endpoint *discmocks.MockDiscoveryPeerEndpoint) *gossip.SignedGossipMessage {
132+
return &gossip.SignedGossipMessage{
133+
GossipMessage: &gossip.GossipMessage{
134+
Content: &gossip.GossipMessage_AliveMsg{
135+
AliveMsg: &gossip.AliveMessage{
136+
Membership: &gossip.Member{
137+
Endpoint: endpoint.Endpoint,
138+
},
139+
},
140+
},
141+
},
142+
}
143+
}
144+
145+
func newStateInfoMessage(endpoint *discmocks.MockDiscoveryPeerEndpoint) *gossip.SignedGossipMessage {
146+
return &gossip.SignedGossipMessage{
147+
GossipMessage: &gossip.GossipMessage{
148+
Content: &gossip.GossipMessage_StateInfo{
149+
StateInfo: &gossip.StateInfo{
150+
Properties: &gossip.Properties{
151+
LedgerHeight: endpoint.LedgerHeight,
152+
},
153+
},
154+
},
155+
},
156+
}
157+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
Copyright SecureKey Technologies Inc. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package dynamicdiscovery
8+
9+
import (
10+
"time"
11+
12+
"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
13+
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
14+
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazycache"
15+
"github.com/pkg/errors"
16+
)
17+
18+
var logger = logging.NewLogger("fabsdk/client")
19+
20+
// Provider implements a dynamic Discovery Provider that queries
21+
// Fabric's Discovery service for information about the peers that
22+
// are currently joined to the given channel.
23+
type Provider struct {
24+
cache *lazycache.Cache
25+
}
26+
27+
// Opt is a provider option
28+
type Opt func(o *options)
29+
30+
// WithRefreshInterval sets the interval in which the
31+
// peer cache is refreshed
32+
func WithRefreshInterval(value time.Duration) Opt {
33+
return func(o *options) {
34+
o.refreshInterval = value
35+
}
36+
}
37+
38+
// WithResponseTimeout sets the Discover service response timeout
39+
func WithResponseTimeout(value time.Duration) Opt {
40+
return func(o *options) {
41+
o.responseTimeout = value
42+
}
43+
}
44+
45+
type options struct {
46+
refreshInterval time.Duration
47+
responseTimeout time.Duration
48+
}
49+
50+
// New creates a new dynamic discovery provider
51+
func New(config fab.EndpointConfig, opts ...Opt) *Provider {
52+
options := options{}
53+
for _, opt := range opts {
54+
opt(&options)
55+
}
56+
57+
if options.refreshInterval == 0 {
58+
options.refreshInterval = config.Timeout(fab.DiscoveryServiceRefresh)
59+
}
60+
if options.responseTimeout == 0 {
61+
options.responseTimeout = config.Timeout(fab.DiscoveryResponse)
62+
}
63+
64+
return &Provider{
65+
cache: lazycache.New("Discovery_Service_Cache", func(key lazycache.Key) (interface{}, error) {
66+
return newService(options), nil
67+
}),
68+
}
69+
}
70+
71+
// CreateDiscoveryService will create a new membership service
72+
func (p *Provider) CreateDiscoveryService(channelID string) (fab.DiscoveryService, error) {
73+
ref, err := p.cache.Get(lazycache.NewStringKey(channelID))
74+
if err != nil {
75+
return nil, errors.WithMessage(err, "failed to get discovery service from cache")
76+
}
77+
return ref.(fab.DiscoveryService), nil
78+
}
79+
80+
// Close will close the cache and all services contained by the cache.
81+
func (p *Provider) Close() {
82+
p.cache.Close()
83+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright SecureKey Technologies Inc. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package dynamicdiscovery
8+
9+
import (
10+
"testing"
11+
"time"
12+
13+
pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
14+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
15+
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
16+
"github.com/stretchr/testify/assert"
17+
)
18+
19+
const (
20+
ch = "orgchannel"
21+
22+
mspID1 = "Org1MSP"
23+
mspID2 = "Org2MSP"
24+
25+
peer1MSP1 = "peer1.org1.com:9999"
26+
)
27+
28+
func TestDiscoveryProvider(t *testing.T) {
29+
ctx := mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1))
30+
config := &config{
31+
EndpointConfig: mocks.NewMockEndpointConfig(),
32+
peers: []pfab.ChannelPeer{
33+
{
34+
NetworkPeer: pfab.NetworkPeer{
35+
PeerConfig: pfab.PeerConfig{
36+
URL: peer1MSP1,
37+
},
38+
},
39+
},
40+
},
41+
}
42+
ctx.SetEndpointConfig(config)
43+
44+
p := New(config, WithRefreshInterval(30*time.Second), WithResponseTimeout(10*time.Second))
45+
defer p.Close()
46+
47+
service, err := p.CreateDiscoveryService(ch)
48+
assert.NoError(t, err)
49+
50+
chCtx := mocks.NewMockChannelContext(ctx, ch)
51+
52+
err = service.(*Service).Initialize(chCtx)
53+
assert.NoError(t, err)
54+
}
55+
56+
type config struct {
57+
pfab.EndpointConfig
58+
peers []pfab.ChannelPeer
59+
}
60+
61+
func (c *config) ChannelPeers(name string) ([]pfab.ChannelPeer, error) {
62+
return c.peers, nil
63+
}

0 commit comments

Comments
 (0)