Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit aabee4d

Browse files
committed
[FABG-812] Selection handling of 'access denied' error
When an 'access denied' error is received from the Discovery server, close the selection service reference so that periodic refresh is halted. Change-Id: Id1156325147932e9fb085f0a76235cd404798b47 Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent a19e0b0 commit aabee4d

File tree

7 files changed

+180
-40
lines changed

7 files changed

+180
-40
lines changed

pkg/client/common/discovery/dynamicdiscovery/service.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ func newService(config fab.EndpointConfig, query queryPeers, opts ...coptions.Op
6969
if err != nil {
7070
derr, ok := err.(*discoveryError)
7171
if ok && derr.IsFatal() {
72-
logger.Warnf("Got fatal error [%s]. Closing discovery client.", err)
73-
s.lastErr = err
74-
go func() { s.Close() }()
72+
go s.close(err)
7573
}
7674
}
7775
return peers, err
@@ -110,11 +108,26 @@ func (s *service) Close() {
110108
s.peersRef.Close()
111109
}
112110

111+
func (s *service) close(err error) {
112+
logger.Warnf("Got fatal error [%s]. Closing discovery client.", err)
113+
s.lock.Lock()
114+
defer s.lock.Unlock()
115+
s.lastErr = err
116+
s.peersRef.Close()
117+
}
118+
119+
func (s *service) getLastError() error {
120+
s.lock.RLock()
121+
defer s.lock.RUnlock()
122+
return s.lastErr
123+
}
124+
113125
// GetPeers returns the available peers
114126
func (s *service) GetPeers() ([]fab.Peer, error) {
115127
if s.peersRef.IsClosed() {
116-
if s.lastErr != nil {
117-
return nil, errors.Errorf("Discovery client has been closed due to error: %s", s.lastErr)
128+
lastErr := s.getLastError()
129+
if lastErr != nil {
130+
return nil, errors.Errorf("Discovery client has been closed due to error: %s", lastErr)
118131
}
119132
return nil, errors.Errorf("Discovery client has been closed")
120133
}

pkg/client/common/selection/fabricselection/fabricselection.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"encoding/json"
1212
"fmt"
1313
"strings"
14+
"sync"
1415
"time"
1516

1617
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
@@ -32,7 +33,10 @@ import (
3233
grpcCodes "google.golang.org/grpc/codes"
3334
)
3435

35-
const moduleName = "fabsdk/client"
36+
const (
37+
moduleName = "fabsdk/client"
38+
accessDenied = "access denied"
39+
)
3640

3741
var logger = logging.NewLogger(moduleName)
3842

@@ -53,12 +57,13 @@ var defaultRetryOpts = retry.Opts{
5357
RetryableCodes: retryableCodes,
5458
}
5559

56-
type discoveryClient interface {
60+
// DiscoveryClient is the client to the discovery service
61+
type DiscoveryClient interface {
5762
Send(ctx context.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error)
5863
}
5964

6065
// clientProvider is overridden by unit tests
61-
var clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
66+
var clientProvider = func(ctx contextAPI.Client) (DiscoveryClient, error) {
6267
return fabdiscovery.New(ctx)
6368
}
6469

@@ -69,9 +74,11 @@ type Service struct {
6974
responseTimeout time.Duration
7075
ctx contextAPI.Client
7176
discovery fab.DiscoveryService
72-
discClient discoveryClient
77+
discClient DiscoveryClient
7378
chResponseCache *lazycache.Cache
7479
retryOpts retry.Opts
80+
lastErr error
81+
lock sync.RWMutex
7582
}
7683

7784
// New creates a new dynamic selection service using Fabric's Discovery Service
@@ -123,7 +130,14 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService
123130
logger.Debugf("Overriding retry opts: %#v", ropts)
124131
}
125132

126-
return s.queryEndorsers(invocationChain, ropts)
133+
endorsers, err := s.queryEndorsers(invocationChain, ropts)
134+
if err != nil {
135+
derr, ok := err.(discoveryError)
136+
if ok && derr.isFatal() {
137+
go s.close(err)
138+
}
139+
}
140+
return endorsers, err
127141
},
128142
lazyref.WithRefreshInterval(lazyref.InitImmediately, options.refreshInterval),
129143
)
@@ -133,6 +147,14 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService
133147

134148
// GetEndorsersForChaincode returns the endorsing peers for the given chaincodes
135149
func (s *Service) GetEndorsersForChaincode(chaincodes []*fab.ChaincodeCall, opts ...coptions.Opt) ([]fab.Peer, error) {
150+
if s.chResponseCache.IsClosed() {
151+
lastErr := s.getLastError()
152+
if lastErr != nil {
153+
return nil, errors.Errorf("Selection service has been closed due to error: %s", lastErr)
154+
}
155+
return nil, errors.Errorf("Selection service has been closed")
156+
}
157+
136158
logger.Debugf("Getting endorsers for chaincodes [%#v]...", chaincodes)
137159
if len(chaincodes) == 0 {
138160
return nil, errors.New("no chaincode IDs provided")
@@ -167,6 +189,20 @@ func (s *Service) Close() {
167189
s.chResponseCache.Close()
168190
}
169191

192+
func (s *Service) close(err error) {
193+
logger.Warnf("Got fatal error [%s]. Closing selection service.", err)
194+
s.lock.Lock()
195+
defer s.lock.Unlock()
196+
s.lastErr = err
197+
s.chResponseCache.Close()
198+
}
199+
200+
func (s *Service) getLastError() error {
201+
s.lock.RLock()
202+
defer s.lock.RUnlock()
203+
return s.lastErr
204+
}
205+
170206
func (s *Service) getEndorsers(chaincodes []*fab.ChaincodeCall, chResponse discclient.ChannelResponse, peerFilter soptions.PeerFilter, sorter soptions.PeerSorter) (discclient.Endorsers, error) {
171207
peers, err := s.discovery.GetPeers()
172208
if err != nil {
@@ -214,9 +250,9 @@ func (s *Service) queryEndorsers(chaincodes []*fab.ChaincodeCall, retryOpts retr
214250
)
215251

216252
if err != nil {
217-
return nil, err
253+
return nil, newDiscoveryError(err)
218254
}
219-
return chResponse.(discclient.ChannelResponse), err
255+
return chResponse.(discclient.ChannelResponse), nil
220256
}
221257

222258
func (s *Service) query(req *discclient.Request, chaincodes []*fab.ChaincodeCall, targets []fab.PeerConfig) (discclient.ChannelResponse, error) {
@@ -355,3 +391,7 @@ func (e discoveryError) isTransient() bool {
355391
return strings.Contains(e.Error(), "failed constructing descriptor for chaincodes") ||
356392
strings.Contains(e.Error(), "no endorsement combination can be satisfied")
357393
}
394+
395+
func (e discoveryError) isFatal() bool {
396+
return e == accessDenied
397+
}

pkg/client/common/selection/fabricselection/selection_test.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// +build testing
2+
13
/*
24
Copyright SecureKey Technologies Inc. All Rights Reserved.
35
@@ -114,15 +116,15 @@ func TestSelection(t *testing.T) {
114116

115117
discClient := clientmocks.NewMockDiscoveryClient()
116118

117-
clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
119+
SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
118120
return discClient, nil
119-
}
121+
})
120122

121123
service, err := New(
122124
ctx, channelID,
123125
mocks.NewMockDiscoveryService(nil, peer1Org1, peer2Org1, peer1Org2, peer2Org2, peer1Org3, peer2Org3),
124-
WithRefreshInterval(100*time.Millisecond),
125-
WithResponseTimeout(10*time.Millisecond),
126+
WithRefreshInterval(10*time.Millisecond),
127+
WithResponseTimeout(100*time.Millisecond),
126128
)
127129
require.NoError(t, err)
128130
defer service.Close()
@@ -135,7 +137,7 @@ func TestSelection(t *testing.T) {
135137
Error: fmt.Errorf("simulated response error"),
136138
},
137139
)
138-
testSelectionError(t, service)
140+
testSelectionError(t, service, "error getting channel response for channel [testchannel]: simulated response error")
139141
})
140142

141143
t.Run("CCtoCC", func(t *testing.T) {
@@ -149,7 +151,7 @@ func TestSelection(t *testing.T) {
149151
)
150152

151153
// Wait for cache to refresh
152-
time.Sleep(200 * time.Millisecond)
154+
time.Sleep(20 * time.Millisecond)
153155
testSelectionCCtoCC(t, service)
154156
})
155157

@@ -168,6 +170,18 @@ func TestSelection(t *testing.T) {
168170
t.Run("Priority Selector", func(t *testing.T) {
169171
testSelectionPrioritySelector(t, service)
170172
})
173+
174+
t.Run("Fatal Error", func(t *testing.T) {
175+
discClient.SetResponses(
176+
&clientmocks.MockDiscoverEndpointResponse{
177+
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{},
178+
Error: fmt.Errorf(accessDenied),
179+
},
180+
)
181+
// Wait for cache to refresh
182+
time.Sleep(20 * time.Millisecond)
183+
testSelectionError(t, service, "Selection service has been closed due to error: access denied")
184+
})
171185
}
172186

173187
func TestWithDiscoveryFilter(t *testing.T) {
@@ -179,9 +193,9 @@ func TestWithDiscoveryFilter(t *testing.T) {
179193
ctx.SetEndpointConfig(config)
180194

181195
discClient := clientmocks.NewMockDiscoveryClient()
182-
clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
196+
SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
183197
return discClient, nil
184-
}
198+
})
185199

186200
discClient.SetResponses(
187201
&clientmocks.MockDiscoverEndpointResponse{
@@ -241,9 +255,10 @@ func TestWithDiscoveryFilter(t *testing.T) {
241255
})
242256
}
243257

244-
func testSelectionError(t *testing.T, service *Service) {
258+
func testSelectionError(t *testing.T, service *Service, expectedErrMsg string) {
245259
endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}})
246260
assert.Error(t, err)
261+
assert.Equal(t, expectedErrMsg, err.Error())
247262
assert.Equal(t, 0, len(endorsers))
248263
}
249264

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// +build testing
2+
3+
/*
4+
Copyright SecureKey Technologies Inc. All Rights Reserved.
5+
6+
SPDX-License-Identifier: Apache-2.0
7+
*/
8+
9+
package fabricselection
10+
11+
import (
12+
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
13+
)
14+
15+
// SetClientProvider overrides the discovery client provider for unit tests
16+
func SetClientProvider(provider func(ctx contextAPI.Client) (DiscoveryClient, error)) {
17+
clientProvider = provider
18+
}

pkg/fabsdk/provider/chpvdr/chprovider_test.go

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package chpvdr
1111
import (
1212
"errors"
1313
"testing"
14+
"time"
1415

1516
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery"
1617
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/staticdiscovery"
@@ -115,7 +116,69 @@ func TestBasicValidChannel(t *testing.T) {
115116
assert.Truef(t, ok, "Expecting selection to be Fabric for v1_2")
116117
}
117118

118-
func TestAccessDenied(t *testing.T) {
119+
func TestDiscoveryAccessDenied(t *testing.T) {
120+
discClient, channelService := setupDiscovery(t, func(discClient *clientmocks.MockDiscoveryClient) {
121+
dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) {
122+
return discClient, nil
123+
})
124+
})
125+
126+
discClient.SetResponses(
127+
&clientmocks.MockDiscoverEndpointResponse{
128+
Error: errors.New("access denied"),
129+
},
130+
)
131+
132+
discovery, err := channelService.Discovery()
133+
require.NoError(t, err)
134+
require.NotNil(t, discovery)
135+
_, ok := discovery.(*dynamicdiscovery.ChannelService)
136+
assert.Truef(t, ok, "Expecting discovery to be Dynamic for v1_2")
137+
138+
_, err = discovery.GetPeers()
139+
require.Error(t, err)
140+
assert.Equal(t, "access denied", err.Error())
141+
142+
time.Sleep(50 * time.Millisecond)
143+
144+
// Subsequent calls should fail since the service is closed
145+
_, err = discovery.GetPeers()
146+
require.Error(t, err)
147+
assert.Equal(t, "Discovery client has been closed due to error: access denied", err.Error())
148+
}
149+
150+
func TestSelectionAccessDenied(t *testing.T) {
151+
discClient, channelService := setupDiscovery(t, func(discClient *clientmocks.MockDiscoveryClient) {
152+
fabricselection.SetClientProvider(func(ctx context.Client) (fabricselection.DiscoveryClient, error) {
153+
return discClient, nil
154+
})
155+
})
156+
157+
discClient.SetResponses(
158+
&clientmocks.MockDiscoverEndpointResponse{
159+
Error: errors.New("access denied"),
160+
},
161+
)
162+
163+
selection, err := channelService.Selection()
164+
require.NoError(t, err)
165+
require.NotNil(t, selection)
166+
_, ok := selection.(*fabricselection.Service)
167+
assert.Truef(t, ok, "Expecting selection to be Fabric for v1_2")
168+
169+
_, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}})
170+
require.Error(t, err)
171+
assert.Equal(t, "error getting channel response for channel [testchannel]: access denied", err.Error())
172+
173+
time.Sleep(50 * time.Millisecond)
174+
175+
// Subsequent calls should fail since the service is closed
176+
_, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}})
177+
require.Error(t, err)
178+
assert.Equal(t, "Selection service has been closed due to error: access denied", err.Error())
179+
}
180+
181+
func setupDiscovery(t *testing.T, preInit func(discClient *clientmocks.MockDiscoveryClient)) (*clientmocks.MockDiscoveryClient, fab.ChannelService) {
119182
ctx := mocks.NewMockProviderContext()
120183

121184
user := mspmocks.NewMockSigningIdentity("user", "user")
@@ -127,15 +190,7 @@ func TestAccessDenied(t *testing.T) {
127190

128191
discClient := clientmocks.NewMockDiscoveryClient()
129192

130-
discClient.SetResponses(
131-
&clientmocks.MockDiscoverEndpointResponse{
132-
Error: errors.New("access denied"),
133-
},
134-
)
135-
136-
dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) {
137-
return discClient, nil
138-
})
193+
preInit(discClient)
139194

140195
cp, err := New(clientCtx.EndpointConfig())
141196
require.NoError(t, err)
@@ -152,13 +207,5 @@ func TestAccessDenied(t *testing.T) {
152207
channelService, err := cp.ChannelService(clientCtx, "testchannel")
153208
require.NoError(t, err)
154209

155-
discovery, err := channelService.Discovery()
156-
require.NoError(t, err)
157-
require.NotNil(t, discovery)
158-
_, ok := discovery.(*dynamicdiscovery.ChannelService)
159-
assert.Truef(t, ok, "Expecting discovery to be Dynamic for v1_2")
160-
161-
_, err = discovery.GetPeers()
162-
require.Error(t, err)
163-
assert.Equal(t, "access denied", err.Error())
210+
return discClient, channelService
164211
}

pkg/util/concurrent/lazycache/lazycache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ func (c *Cache) Close() {
169169
c.DeleteAll()
170170
}
171171

172+
// IsClosed reeturns true if the cache has been closed
173+
func (c *Cache) IsClosed() bool {
174+
return atomic.LoadInt32(&c.closed) == 1
175+
}
176+
172177
// DeleteAll does the following:
173178
// - calls Close on all values that implement a Close() function
174179
// - deletes all entries from the cache

pkg/util/concurrent/lazycache/lazycache_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ func TestClose(t *testing.T) {
261261
// Close the cache
262262
cache.Close()
263263

264+
assert.True(t, cache.IsClosed())
265+
264266
// Close again should be fine
265267
cache.Close()
266268

0 commit comments

Comments
 (0)