Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit beccd9c

Browse files
committed
[FABG-718] fix for ChainCodeEvent getting older events
- default to 'Newest' and discard first event when no seek type is provided for a new client Change-Id: Id5e8b05e340b2c7afe99ef681c383ddde62b6f82 Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
1 parent b25b359 commit beccd9c

File tree

4 files changed

+83
-8
lines changed

4 files changed

+83
-8
lines changed

pkg/fab/events/deliverclient/deliverclient.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,21 @@ func New(context fabcontext.Client, chConfig fab.ChannelCfg, discoveryService fa
6262
return nil, err
6363
}
6464

65+
dispatcher := dispatcher.New(context, chConfig, discoveryWrapper, params.connProvider, opts...)
66+
67+
//default seek type is `Newest`
68+
if params.seekType == "" {
69+
params.seekType = seek.Newest
70+
//discard (do not publish) next BlockEvent/FilteredBlockEvent in dispatcher, since default seek type 'newest' is
71+
// only needed for block height calculations
72+
dispatcher.DiscardNextEvent()
73+
}
74+
6575
client := &Client{
66-
Client: *client.New(
67-
dispatcher.New(context, chConfig, discoveryWrapper, params.connProvider, opts...),
68-
opts...,
69-
),
76+
Client: *client.New(dispatcher, opts...),
7077
params: *params,
7178
}
79+
7280
client.SetAfterConnectHandler(client.seek)
7381
client.SetBeforeReconnectHandler(client.setSeekFromLastBlockReceived)
7482

pkg/fab/events/deliverclient/opts.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type params struct {
2424
func defaultParams() *params {
2525
return &params{
2626
connProvider: deliverFilteredProvider,
27-
seekType: seek.Newest,
2827
respTimeout: 5 * time.Second,
2928
}
3029
}

pkg/fab/events/service/dispatcher/dispatcher.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,15 @@ type HandlerRegistry map[reflect.Type]Handler
4444
// This also avoids the need for synchronization.
4545
type Dispatcher struct {
4646
params
47-
handlers map[reflect.Type]Handler
47+
lastBlockNum uint64
48+
discardNextEvent bool
49+
state int32
4850
eventch chan interface{}
4951
blockRegistrations []*BlockReg
5052
filteredBlockRegistrations []*FilteredBlockReg
53+
handlers map[reflect.Type]Handler
5154
txRegistrations map[string]*TxStatusReg
5255
ccRegistrations map[string]*ChaincodeReg
53-
state int32
54-
lastBlockNum uint64
5556
}
5657

5758
// New creates a new Dispatcher.
@@ -306,6 +307,11 @@ func (ed *Dispatcher) HandleBlock(block *cb.Block, sourceURL string) {
306307
return
307308
}
308309

310+
if ed.discardNextEvent {
311+
ed.discardNextEvent = false
312+
return
313+
}
314+
309315
ed.publishBlockEvents(block, sourceURL)
310316
ed.publishFilteredBlockEvents(toFilteredBlock(block), sourceURL)
311317
}
@@ -319,6 +325,11 @@ func (ed *Dispatcher) HandleFilteredBlock(fblock *pb.FilteredBlock, sourceURL st
319325
return
320326
}
321327

328+
if ed.discardNextEvent {
329+
ed.discardNextEvent = false
330+
return
331+
}
332+
322333
logger.Debug("Publishing filtered block event...")
323334
ed.publishFilteredBlockEvents(fblock, sourceURL)
324335
}
@@ -506,6 +517,11 @@ func (ed *Dispatcher) RegisterHandler(t interface{}, h Handler) {
506517
}
507518
}
508519

520+
//DiscardNextEvent sets if next event needs to be published or not
521+
func (ed *Dispatcher) DiscardNextEvent() {
522+
ed.discardNextEvent = true
523+
}
524+
509525
func getCCKey(ccID, eventFilter string) string {
510526
return ccID + "/" + eventFilter
511527
}

test/integration/pkg/client/channel/channel_client_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ func TestChannelClient(t *testing.T) {
102102
testChaincodeEventListener(chaincodeID, chClient, listener, t)
103103

104104
testDuplicateTargets(chaincodeID, chClient, t)
105+
106+
//test if CCEvents for chaincode events are in sync when new channel client are created
107+
// for each transaction
108+
testMultipleClientChaincodeEvent(chaincodeID, t)
105109
}
106110

107111
func testDuplicateTargets(chaincodeID string, chClient *channel.Client, t *testing.T) {
@@ -390,6 +394,54 @@ func testChaincodeEvent(ccID string, chClient *channel.Client, t *testing.T) {
390394
}
391395
}
392396

397+
//TestMultipleEventClient tests if CCEvents for chaincode events are in sync when new channel client are created
398+
// for each transaction
399+
func testMultipleClientChaincodeEvent(chainCodeID string, t *testing.T) {
400+
401+
channelID := mainTestSetup.ChannelID
402+
eventID := "([a-zA-Z]+)"
403+
404+
for i := 0; i < 10; i++ {
405+
406+
sdk, err := fabsdk.New(integration.ConfigBackend)
407+
if err != nil {
408+
t.Fatalf("Failed to create new SDK: %s", err)
409+
}
410+
411+
chContextProvider := sdk.ChannelContext(channelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name))
412+
413+
chClient, err := channel.New(chContextProvider)
414+
if err != nil {
415+
t.Fatalf("Failed to create new channel client: %s", err)
416+
}
417+
418+
// Register chaincode event (pass in channel which receives event details when the event is complete)
419+
reg, notifier, err := chClient.RegisterChaincodeEvent(chainCodeID, eventID)
420+
if err != nil {
421+
t.Fatalf("Failed to register cc event: %s", err)
422+
}
423+
defer chClient.UnregisterChaincodeEvent(reg)
424+
425+
// Move funds
426+
resp, err := chClient.Execute(channel.Request{ChaincodeID: chainCodeID, Fcn: "invoke",
427+
Args: integration.ExampleCCTxArgs()}, channel.WithRetry(retry.DefaultChannelOpts))
428+
if err != nil {
429+
t.Fatalf("Failed to move funds: %s", err)
430+
}
431+
432+
txID := resp.TransactionID
433+
434+
var ccEvent *fab.CCEvent
435+
select {
436+
case ccEvent = <-notifier:
437+
t.Logf("Received CC eventID: %#v\n", ccEvent.TxID)
438+
case <-time.After(time.Second * 20):
439+
t.Fatalf("Did NOT receive CC event for eventId(%s)\n", eventID)
440+
}
441+
assert.Equal(t, string(txID), ccEvent.TxID, "mismatched ccEvent.TxID")
442+
}
443+
}
444+
393445
func testChaincodeEventListener(ccID string, chClient *channel.Client, listener *channel.Client, t *testing.T) {
394446

395447
eventID := integration.GenerateRandomID()

0 commit comments

Comments
 (0)