Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 37c1b5a

Browse files
committed
[FAB-10543] gracefully complete orderer calls
Change-Id: I5f2980cf5dbc01c99548dbe438dd1fa8f614b5b8 Signed-off-by: Troy Ronda <troy@troyronda.com>
1 parent 478d097 commit 37c1b5a

File tree

7 files changed

+128
-62
lines changed

7 files changed

+128
-62
lines changed

pkg/client/resmgmt/example_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,10 +387,9 @@ func mockClientProvider() context.ClientProvider {
387387

388388
// Create mock orderer with simple mock block
389389
orderer := mocks.NewMockOrderer("", nil)
390-
defer orderer.Close()
391-
392390
orderer.EnqueueForSendDeliver(mocks.NewSimpleMockBlock())
393391
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
392+
orderer.CloseQueue()
394393

395394
setupCustomOrderer(ctx, orderer)
396395

pkg/client/resmgmt/resmgmt_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ func TestJoinChannelFail(t *testing.T) {
6363

6464
// Create mock orderer with simple mock block
6565
orderer := fcmocks.NewMockOrderer("", nil)
66-
defer orderer.Close()
6766
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
6867
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
68+
orderer.CloseQueue()
6969

7070
setupCustomOrderer(ctx, orderer)
7171

@@ -99,9 +99,9 @@ func TestJoinChannelSuccess(t *testing.T) {
9999

100100
// Create mock orderer with simple mock block
101101
orderer := fcmocks.NewMockOrderer("", nil)
102-
defer orderer.Close()
103102
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
104103
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
104+
orderer.CloseQueue()
105105

106106
setupCustomOrderer(ctx, orderer)
107107

@@ -135,9 +135,9 @@ func TestJoinChannelWithFilter(t *testing.T) {
135135

136136
// Create mock orderer with simple mock block
137137
orderer := fcmocks.NewMockOrderer("", nil)
138-
defer orderer.Close()
139138
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
140139
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
140+
orderer.CloseQueue()
141141
setupCustomOrderer(ctx, orderer)
142142

143143
//the target filter ( client option) will be set
@@ -224,9 +224,9 @@ func TestJoinChannelWithOptsRequiredParameters(t *testing.T) {
224224

225225
// Create mock orderer with simple mock block
226226
orderer := fcmocks.NewMockOrderer("", nil)
227-
defer orderer.Close()
228227
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
229228
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
229+
orderer.CloseQueue()
230230
setupCustomOrderer(ctx, orderer)
231231

232232
rc := setupResMgmtClient(t, ctx, getDefaultTargetFilterOption())
@@ -262,12 +262,12 @@ func TestJoinChannelWithOptsRequiredParameters(t *testing.T) {
262262

263263
//Some cleanup before further test
264264
orderer = fcmocks.NewMockOrderer("", nil)
265-
defer orderer.Close()
265+
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
266+
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
267+
orderer.CloseQueue()
266268

267269
ctx = setupTestContext("test", "Org1MSP")
268270
setupCustomOrderer(ctx, orderer)
269-
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
270-
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
271271

272272
rc = setupResMgmtClientWithLocalPeers(t, ctx, peers, getDefaultTargetFilterOption())
273273

pkg/fab/mocks/mockorderer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ func delivery(o *MockOrderer) {
6868
}
6969
switch value.(type) {
7070
case common.Status:
71-
close(o.Deliveries)
72-
return
7371
case *common.Block:
7472
o.Deliveries <- value.(*common.Block)
7573
case error:
@@ -105,8 +103,8 @@ func (o *MockOrderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEn
105103
return o.Deliveries, o.DeliveryErrors
106104
}
107105

108-
// Close cleans up the instance and ends goroutines
109-
func (o *MockOrderer) Close() {
106+
// CloseQueue ends the mock broadcast and delivery queues
107+
func (o *MockOrderer) CloseQueue() {
110108
close(o.BroadcastQueue)
111109
close(o.DeliveryQueue)
112110
}

pkg/fab/orderer/orderer.go

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ package orderer
99
import (
1010
reqContext "context"
1111
"crypto/x509"
12+
"io"
1213
"time"
1314

15+
"github.com/hyperledger/fabric-sdk-go/pkg/common/errors/multi"
1416
"github.com/pkg/errors"
1517
"github.com/spf13/cast"
1618
"google.golang.org/grpc"
@@ -284,32 +286,61 @@ func (o *Orderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnve
284286
logger.Debugf("unable to close broadcast client [%s]", err)
285287
}
286288

287-
select {
288-
case broadcastStatus := <-responses:
289-
return &broadcastStatus, nil
290-
case broadcastErr := <-errs:
291-
return nil, broadcastErr
292-
}
289+
return wrapStreamStatusRPC(responses, errs)
293290
}
294291

295-
func broadcastStream(broadcastClient ab.AtomicBroadcast_BroadcastClient, responses chan common.Status, errs chan error) {
292+
// wrapStreamStatusRPC returns the last response and err and blocks until the chan is closed.
293+
func wrapStreamStatusRPC(responses chan common.Status, errs chan error) (*common.Status, error) {
294+
var status common.Status
295+
var err multi.Errors
296296

297-
broadcastResponse, err := broadcastClient.Recv()
298-
if err != nil {
299-
rpcStatus, ok := grpcstatus.FromError(err)
300-
if ok {
301-
err = status.NewFromGRPCStatus(rpcStatus)
297+
read:
298+
for {
299+
select {
300+
case s, ok := <-responses:
301+
if !ok {
302+
break read
303+
}
304+
status = s
305+
case e := <-errs:
306+
err = append(err, e)
302307
}
303-
errs <- errors.Wrap(err, "broadcast recv failed")
304-
return
305308
}
306309

307-
if broadcastResponse.Status != common.Status_SUCCESS {
308-
errs <- status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil)
309-
return
310+
// drain remaining errors.
311+
for i := 0; i < len(errs); i++ {
312+
e := <-errs
313+
err = append(err, e)
310314
}
311315

312-
responses <- broadcastResponse.Status
316+
return &status, err.ToError()
317+
}
318+
319+
func broadcastStream(broadcastClient ab.AtomicBroadcast_BroadcastClient, responses chan common.Status, errs chan error) {
320+
for {
321+
broadcastResponse, err := broadcastClient.Recv()
322+
if err == io.EOF {
323+
// done
324+
close(responses)
325+
return
326+
}
327+
328+
if err != nil {
329+
rpcStatus, ok := grpcstatus.FromError(err)
330+
if ok {
331+
err = status.NewFromGRPCStatus(rpcStatus)
332+
}
333+
errs <- errors.Wrap(err, "broadcast recv failed")
334+
close(responses)
335+
return
336+
}
337+
338+
if broadcastResponse.Status == common.Status_SUCCESS {
339+
responses <- broadcastResponse.Status
340+
} else {
341+
errs <- status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil)
342+
}
343+
}
313344
}
314345

315346
// SendDeliver sends a deliver request to the ordering service and returns the
@@ -325,10 +356,11 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
325356
rpcStatus, ok := grpcstatus.FromError(err)
326357
if ok {
327358
errs <- errors.WithMessage(status.NewFromGRPCStatus(rpcStatus), "connection failed")
328-
return responses, errs
359+
} else {
360+
errs <- status.New(status.OrdererClientStatus, status.ConnectionFailed.ToInt32(), err.Error(), nil)
329361
}
330362

331-
errs <- status.New(status.OrdererClientStatus, status.ConnectionFailed.ToInt32(), err.Error(), nil)
363+
close(responses)
332364
return responses, errs
333365
}
334366

@@ -339,6 +371,8 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
339371
o.releaseConn(ctx, conn)
340372

341373
errs <- errors.Wrap(err, "deliver failed")
374+
375+
close(responses)
342376
return responses, errs
343377
}
344378

@@ -355,10 +389,7 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
355389
Signature: envelope.Signature,
356390
})
357391
if err != nil {
358-
o.releaseConn(ctx, conn)
359-
360-
errs <- errors.Wrap(err, "failed to send block request to orderer")
361-
return responses, errs
392+
logger.Warnf("failed to send block request to orderer [%s]", err)
362393
}
363394

364395
if err = broadcastClient.CloseSend(); err != nil {
@@ -369,32 +400,38 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
369400
}
370401

371402
func blockStream(deliverClient ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) {
403+
372404
for {
373405
response, err := deliverClient.Recv()
406+
if err == io.EOF {
407+
// done
408+
close(responses)
409+
return
410+
}
411+
374412
if err != nil {
375413
errs <- errors.Wrap(err, "recv from ordering service failed")
414+
close(responses)
376415
return
377416
}
417+
378418
// Assert response type
379419
switch t := response.Type.(type) {
380-
// Seek operation success, no more resposes
420+
// Seek operation success, no more responses
381421
case *ab.DeliverResponse_Status:
382422
logger.Debugf("Received deliver response status from ordering service: %s", t.Status)
383423
if t.Status != common.Status_SUCCESS {
384424
errs <- status.New(status.OrdererServerStatus, int32(t.Status), "error status from ordering service", []interface{}{})
385-
return
386425
}
387-
close(responses)
388-
return
389426

390427
// Response is a requested block
391428
case *ab.DeliverResponse_Block:
392429
logger.Debug("Received block from ordering service")
393430
responses <- response.GetBlock()
394431
// Unknown response
395432
default:
396-
errs <- errors.Errorf("unknown response type from ordering service %T", t)
397-
return
433+
// ignore unknown types.
434+
logger.Infof("unknown response type from ordering service %T", t)
398435
}
399436
}
400437
}

pkg/fab/orderer/orderer_test.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,15 @@ func TestSendDeliver(t *testing.T) {
9797
blocks, errs := orderer.SendDeliver(ctx, &fab.SignedEnvelope{})
9898

9999
select {
100-
case block := <-blocks:
101-
t.Fatalf("This usecase was not supposed to receive blocks : %#v", block)
100+
case block, ok := <-blocks:
101+
if ok {
102+
t.Fatalf("This usecase was not supposed to receive blocks : %#v", block)
103+
}
104+
case <-time.After(time.Second * 5):
105+
t.Fatal("Did not receive closed response from SendDeliver")
106+
}
107+
108+
select {
102109
case err := <-errs:
103110
t.Logf("There is an error as expected : %s", err)
104111
case <-time.After(time.Second * 5):
@@ -250,6 +257,7 @@ func TestSendDeliverFailure(t *testing.T) {
250257

251258
broadcastServer := mocks.MockBroadcastServer{
252259
DeliverResponse: &ab.DeliverResponse{},
260+
DeliverError: errors.New("fail me"),
253261
}
254262

255263
addr := broadcastServer.Start(testOrdererURL)
@@ -259,17 +267,25 @@ func TestSendDeliverFailure(t *testing.T) {
259267

260268
ctx, cancel := reqContext.WithTimeout(reqContext.Background(), 5*time.Second)
261269
defer cancel()
262-
blocks, errors := orderer.SendDeliver(ctx, &fab.SignedEnvelope{})
270+
blocks, errs := orderer.SendDeliver(ctx, &fab.SignedEnvelope{})
263271

264272
select {
265-
case block := <-blocks:
266-
t.Fatalf("This usecase was not supposed to get valid block %+v", block)
267-
case err := <-errors:
268-
if err == nil || !strings.HasPrefix(err.Error(), "unknown response type from ordering service") {
269-
t.Fatalf("Error response is not working as expected : '%s' ", err)
273+
case block, ok := <-blocks:
274+
if ok {
275+
t.Fatalf("This usecase was not supposed to get valid block %+v", block)
276+
} else {
277+
t.Fatalf("did not receive any response or error from SendDeliver")
278+
}
279+
case err, ok := <-errs:
280+
if ok {
281+
if err == nil || !strings.HasPrefix(err.Error(), "recv from ordering service failed") {
282+
t.Fatalf("Error response is not working as expected : '%s' ", err)
283+
}
284+
} else {
285+
t.Fatalf("did not receive any response or error from SendDeliver")
270286
}
271287
case <-time.After(time.Second * 5):
272-
t.Fatal("Did not receive any response or error from SendDeliver")
288+
t.Fatal("Timeout: did not receive any response or error from SendDeliver")
273289
}
274290
}
275291

@@ -305,9 +321,9 @@ func TestSendBroadcastError(t *testing.T) {
305321

306322
orderer, _ := New(mocks.NewMockEndpointConfig(), WithURL("grpc://"+addr), WithInsecure())
307323

308-
statusCode, err := orderer.SendBroadcast(reqContext.Background(), &fab.SignedEnvelope{})
324+
_, err := orderer.SendBroadcast(reqContext.Background(), &fab.SignedEnvelope{})
309325

310-
if err == nil || statusCode != nil {
326+
if err == nil {
311327
t.Fatalf("expected Send Broadcast to fail with error, but got %s", err)
312328
}
313329
statusError, ok := status.FromError(err)

pkg/fab/resource/resource_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,9 @@ func TestGenesisBlockOrdererErr(t *testing.T) {
216216
ctx := setupContext()
217217

218218
orderer := mocks.NewMockOrderer("", nil)
219-
defer orderer.Close()
220219
orderer.EnqueueForSendDeliver(mocks.NewSimpleMockError())
220+
orderer.CloseQueue()
221+
221222
reqCtx, cancel := contextImpl.NewRequest(ctx, contextImpl.WithTimeout(10*time.Second))
222223
defer cancel()
223224
_, err := GenesisBlockFromOrderer(reqCtx, channelName, orderer)
@@ -232,9 +233,9 @@ func TestGenesisBlock(t *testing.T) {
232233
ctx := setupContext()
233234

234235
orderer := mocks.NewMockOrderer("", nil)
235-
defer orderer.Close()
236236
orderer.EnqueueForSendDeliver(mocks.NewSimpleMockBlock())
237237
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
238+
orderer.CloseQueue()
238239
reqCtx, cancel := contextImpl.NewRequest(ctx, contextImpl.WithTimeout(10*time.Second))
239240
defer cancel()
240241
_, err := GenesisBlockFromOrderer(reqCtx, channelName, orderer)
@@ -249,10 +250,11 @@ func TestGenesisBlockWithRetry(t *testing.T) {
249250
ctx := setupContext()
250251

251252
orderer := mocks.NewMockOrderer("", nil)
252-
defer orderer.Close()
253253
orderer.EnqueueForSendDeliver(status.New(status.OrdererServerStatus, int32(common.Status_SERVICE_UNAVAILABLE), "service unavailable", []interface{}{}))
254254
orderer.EnqueueForSendDeliver(mocks.NewSimpleMockBlock())
255255
orderer.EnqueueForSendDeliver(common.Status_SUCCESS)
256+
orderer.CloseQueue()
257+
256258
reqCtx, cancel := contextImpl.NewRequest(ctx, contextImpl.WithTimeout(10*time.Second))
257259
defer cancel()
258260
block, err := GenesisBlockFromOrderer(reqCtx, channelName, orderer, WithRetry(retry.DefaultResMgmtOpts))
@@ -284,8 +286,8 @@ func TestGenesisBlockOrderer(t *testing.T) {
284286
ctx := setupContext()
285287

286288
orderer := mocks.NewMockOrderer("", nil)
287-
defer orderer.Close()
288289
orderer.EnqueueForSendDeliver(mocks.NewSimpleMockError())
290+
orderer.CloseQueue()
289291

290292
//Call get Genesis block
291293
reqCtx, cancel := contextImpl.NewRequest(ctx, contextImpl.WithTimeout(10*time.Second))

0 commit comments

Comments
 (0)