Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit b36fe41

Browse files
committed
[FAB-6809] Dynamic Selection Service
Change-Id: Ibb690a53cafa42caa128c171b07462ac24502705 Signed-off-by: Sandra Vrtikapa <sandra.vrtikapa@securekey.com>
1 parent cfaafc8 commit b36fe41

File tree

4 files changed

+727
-0
lines changed

4 files changed

+727
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ You're good to go, happy coding! Check out the examples for usage demonstrations
2323
### Examples
2424

2525
- [E2E Test](test/integration/end_to_end_test.go) and [Base Test](test/integration/base_test_setup.go): Part of the E2E tests included with the Go SDK.
26+
- [Dynamic Endorser Selection] (test/integration/sdk_provider_test.go) : An example that uses dynamic endorser selection (based on chaincode policy)
2627
- [CLI](https://github.com/securekey/fabric-examples/tree/master/fabric-cli/): An example CLI for Fabric built with the Go SDK.
2728
- More examples needed!
2829

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
Copyright SecureKey Technologies Inc. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package dynamicselection
8+
9+
import (
10+
"fmt"
11+
"sync"
12+
13+
"github.com/hyperledger/fabric-sdk-go/api/apiconfig"
14+
fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient"
15+
"github.com/hyperledger/fabric-sdk-go/def/fabapi"
16+
"github.com/hyperledger/fabric-sdk-go/pkg/errors"
17+
18+
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/selection/dynamicselection/pgresolver"
19+
)
20+
21+
// ChannelUser contains user(identity) info to be used for specific channel
22+
type ChannelUser struct {
23+
ChannelID string
24+
UserName string
25+
OrgName string
26+
}
27+
28+
// SelectionProvider implements selection provider
29+
type SelectionProvider struct {
30+
config apiconfig.Config
31+
users []ChannelUser
32+
lbp pgresolver.LoadBalancePolicy
33+
sdk *fabapi.FabricSDK
34+
}
35+
36+
// NewSelectionProvider returns dynamic selection provider
37+
func NewSelectionProvider(config apiconfig.Config, users []ChannelUser, lbp pgresolver.LoadBalancePolicy) (*SelectionProvider, error) {
38+
lbPolicy := lbp
39+
if lbPolicy == nil {
40+
lbPolicy = pgresolver.NewRandomLBP()
41+
}
42+
return &SelectionProvider{config: config, users: users, lbp: lbPolicy}, nil
43+
}
44+
45+
type selectionService struct {
46+
channelID string
47+
mutex sync.RWMutex
48+
pgResolvers map[string]pgresolver.PeerGroupResolver
49+
pgLBP pgresolver.LoadBalancePolicy
50+
ccPolicyProvider CCPolicyProvider
51+
}
52+
53+
// Initialize allow for initializing providers
54+
func (p *SelectionProvider) Initialize(sdk *fabapi.FabricSDK) error {
55+
p.sdk = sdk
56+
return nil
57+
}
58+
59+
// NewSelectionService creates a selection service
60+
func (p *SelectionProvider) NewSelectionService(channelID string) (fab.SelectionService, error) {
61+
if channelID == "" {
62+
return nil, errors.New("Must provide channel ID")
63+
}
64+
65+
var channelUser *ChannelUser
66+
for _, p := range p.users {
67+
if p.ChannelID == channelID {
68+
channelUser = &p
69+
break
70+
}
71+
}
72+
73+
if channelUser == nil {
74+
return nil, errors.New("Must provide user for channel")
75+
}
76+
77+
ccPolicyProvider, err := newCCPolicyProvider(p.sdk, channelID, channelUser.UserName, channelUser.OrgName)
78+
if err != nil {
79+
return nil, errors.WithMessage(err, "Failed to create cc policy provider")
80+
}
81+
82+
return &selectionService{
83+
channelID: channelID,
84+
pgResolvers: make(map[string]pgresolver.PeerGroupResolver),
85+
pgLBP: p.lbp,
86+
ccPolicyProvider: ccPolicyProvider,
87+
}, nil
88+
}
89+
90+
func (s *selectionService) GetEndorsersForChaincode(channelPeers []fab.Peer,
91+
chaincodeIDs ...string) ([]fab.Peer, error) {
92+
93+
if len(chaincodeIDs) == 0 {
94+
return nil, errors.New("no chaincode IDs provided")
95+
}
96+
97+
if len(channelPeers) == 0 {
98+
return nil, errors.New("Must provide at least one channel peer")
99+
}
100+
101+
resolver, err := s.getPeerGroupResolver(channelPeers, chaincodeIDs)
102+
if err != nil {
103+
return nil, errors.WithMessage(err, fmt.Sprintf("Error getting peer group resolver for chaincodes [%v] on channel [%s]", chaincodeIDs, s.channelID))
104+
}
105+
return resolver.Resolve().Peers(), nil
106+
}
107+
108+
func (s *selectionService) getPeerGroupResolver(channelPeers []fab.Peer, chaincodeIDs []string) (pgresolver.PeerGroupResolver, error) {
109+
key := newResolverKey(s.channelID, chaincodeIDs...)
110+
111+
s.mutex.RLock()
112+
resolver := s.pgResolvers[key.String()]
113+
s.mutex.RUnlock()
114+
115+
if resolver == nil {
116+
var err error
117+
if resolver, err = s.createPGResolver(channelPeers, key); err != nil {
118+
return nil, errors.WithMessage(err, fmt.Sprintf("unable to create new peer group resolver for chaincode(s) [%v] on channel [%s]", chaincodeIDs, s.channelID))
119+
}
120+
}
121+
return resolver, nil
122+
}
123+
124+
func (s *selectionService) createPGResolver(channelPeers []fab.Peer, key *resolverKey) (pgresolver.PeerGroupResolver, error) {
125+
s.mutex.Lock()
126+
defer s.mutex.Unlock()
127+
128+
resolver := s.pgResolvers[key.String()]
129+
if resolver != nil {
130+
// TODO: Expire cache
131+
return resolver, nil
132+
}
133+
134+
// Retrieve the signature policies for all of the chaincodes
135+
var policyGroups []pgresolver.Group
136+
for _, ccID := range key.chaincodeIDs {
137+
policyGroup, err := s.getPolicyGroupForCC(key.channelID, ccID, channelPeers)
138+
if err != nil {
139+
return nil, errors.WithMessage(err, fmt.Sprintf("error retrieving signature policy for chaincode [%s] on channel [%s]", ccID, key.channelID))
140+
}
141+
policyGroups = append(policyGroups, policyGroup)
142+
}
143+
144+
// Perform an 'and' operation on all of the peer groups
145+
aggregatePolicyGroup, err := pgresolver.NewGroupOfGroups(policyGroups).Nof(int32(len(policyGroups)))
146+
if err != nil {
147+
return nil, errors.WithMessage(err, fmt.Sprintf("error computing signature policy for chaincode(s) [%v] on channel [%s]", key.chaincodeIDs, key.channelID))
148+
}
149+
150+
// Create the resolver
151+
if resolver, err = pgresolver.NewPeerGroupResolver(aggregatePolicyGroup, s.pgLBP); err != nil {
152+
return nil, errors.WithMessage(err, fmt.Sprintf("error creating peer group resolver for chaincodes [%v] on channel [%s]", key.chaincodeIDs, key.channelID))
153+
}
154+
155+
s.pgResolvers[key.String()] = resolver
156+
157+
return resolver, nil
158+
}
159+
160+
func (s *selectionService) getPolicyGroupForCC(channelID string, ccID string, channelPeers []fab.Peer) (pgresolver.Group, error) {
161+
sigPolicyEnv, err := s.ccPolicyProvider.GetChaincodePolicy(ccID)
162+
if err != nil {
163+
return nil, errors.WithMessage(err, fmt.Sprintf("error querying chaincode [%s] on channel [%s]", ccID, channelID))
164+
}
165+
166+
return pgresolver.NewSignaturePolicyCompiler(
167+
func(mspID string) []fab.Peer {
168+
return s.getAvailablePeers(channelPeers, mspID)
169+
}).Compile(sigPolicyEnv)
170+
}
171+
172+
func (s *selectionService) getAvailablePeers(channelPeers []fab.Peer, mspID string) []fab.Peer {
173+
var peers []fab.Peer
174+
for _, peer := range channelPeers {
175+
if string(peer.MSPID()) == mspID {
176+
peers = append(peers, peer)
177+
}
178+
}
179+
180+
str := ""
181+
for i, peer := range peers {
182+
str += peer.URL()
183+
if i+1 < len(peers) {
184+
str += ","
185+
}
186+
}
187+
logger.Debugf("Available peers:\n%s\n", str)
188+
189+
return peers
190+
}

0 commit comments

Comments
 (0)