Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 02c1c6f

Browse files
committed
[FABG-748] fix for lost events in CI
- events lost due to MVCC_READ_CONFLICT are handled - added new test to show the behavior of chaincode events during MVCC_READ_CONFLICTS - example cc 'move' functions are used only where necessary, instead new 'set' to be used to avoid MVCC_READ_CONFLICTS Change-Id: Id38fa45b9922e8e8e1ad45b4415f1d40a28d82fc Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
1 parent 0628c3f commit 02c1c6f

File tree

6 files changed

+126
-13
lines changed

6 files changed

+126
-13
lines changed

pkg/fab/events/service/dispatcher/dispatcher.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,11 +556,16 @@ func (ed *Dispatcher) publishFilteredBlockEvents(fblock *pb.FilteredBlock, sourc
556556
if txActions == nil {
557557
continue
558558
}
559+
if len(txActions.ChaincodeActions) == 0 {
560+
logger.Debugf("No chaincode action found for TxID[%s], block[%d], source URL[%s]", tx.Txid, fblock.Number, sourceURL)
561+
}
559562
for _, action := range txActions.ChaincodeActions {
560563
if action.ChaincodeEvent != nil {
561564
ed.publishCCEvents(action.ChaincodeEvent, fblock.Number, sourceURL)
562565
}
563566
}
567+
} else {
568+
logger.Debugf("Cannot publish CCEvents for block[%d] and source URL[%s] since Tx Validation Code[%d] is not valid", fblock.Number, sourceURL, tx.TxValidationCode)
564569
}
565570
}
566571
}

test/fixtures/testdata/src/github.com/example_cc/example_cc.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,40 @@ func (t *SimpleChaincode) Query(stub shim.ChaincodeStubInterface) pb.Response {
107107
return shim.Error("Unknown supported call")
108108
}
109109

110+
//set sets given key-value in state
111+
func (t *SimpleChaincode) set(stub shim.ChaincodeStubInterface, args []string) pb.Response {
112+
var err error
113+
114+
if len(args) < 3 {
115+
return shim.Error("Incorrect number of arguments. Expecting a key and a value")
116+
}
117+
118+
// Initialize the chaincode
119+
key := args[1]
120+
value := args[2]
121+
eventID := "testEvent"
122+
if len(args) >= 4 {
123+
eventID = args[3]
124+
}
125+
126+
logger.Debugf("Setting value for key[%s]", key)
127+
128+
// Write the state to the ledger
129+
err = stub.PutState(key, []byte(value))
130+
if err != nil {
131+
logger.Errorf("Failed to set value for key[%s] : ", key, err)
132+
return shim.Error(err.Error())
133+
}
134+
135+
err = stub.SetEvent(eventID, []byte("Test Payload"))
136+
if err != nil {
137+
logger.Errorf("Failed to set event for key[%s] : ", key, err)
138+
return shim.Error(err.Error())
139+
}
140+
141+
return shim.Success(nil)
142+
}
143+
110144
// Invoke ...
111145
// Transaction makes payment of X units from A to B
112146
func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
@@ -138,6 +172,12 @@ func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
138172
// queries an entity state
139173
return t.query(stub, args)
140174
}
175+
176+
if args[0] == "set" {
177+
// setting an entity state
178+
return t.set(stub, args)
179+
}
180+
141181
if args[0] == "move" {
142182
eventID := "testEvent"
143183
if len(args) >= 5 {

test/integration/base_test_setup.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ func ExampleCCTxArgs() [][]byte {
7070
return txArgs
7171
}
7272

73+
// ExampleCCTxRandomSetArgs returns example cc set args with random key-value pairs
74+
func ExampleCCTxRandomSetArgs() [][]byte {
75+
return [][]byte{[]byte("set"), []byte(GenerateRandomID()), []byte(GenerateRandomID())}
76+
}
77+
7378
//ExampleCCInitArgs returns example cc initialization args
7479
func ExampleCCInitArgs() [][]byte {
7580
return initArgs

test/integration/pkg/client/channel/channel_client_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func TestCCToCC(t *testing.T) {
197197
channel.Request{
198198
ChaincodeID: cc1ID,
199199
Fcn: "invokecc",
200-
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","move","a","b","1"]}`)},
200+
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","set","x1","y1"]}`)},
201201
},
202202
channel.WithRetry(retry.DefaultChannelOpts),
203203
)
@@ -223,7 +223,7 @@ func TestCCToCC(t *testing.T) {
223223
channel.Request{
224224
ChaincodeID: cc1ID,
225225
Fcn: "invokecc",
226-
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","move","a","b","1"]}`)},
226+
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","set","x1","y1"]}`)},
227227
InvocationChain: []*fab.ChaincodeCall{
228228
{ID: cc2ID},
229229
},
@@ -241,7 +241,7 @@ func TestCCToCC(t *testing.T) {
241241
channel.Request{
242242
ChaincodeID: cc1ID,
243243
Fcn: "invokecc",
244-
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","move","a","b","1"]}`)},
244+
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","set","x1","y1"]}`)},
245245
},
246246
channel.WithRetry(retry.DefaultChannelOpts),
247247
)
@@ -477,7 +477,7 @@ func testChaincodeEventListener(t *testing.T, ccID string, chClient *channel.Cli
477477

478478
func testChaincodeError(t *testing.T, client *channel.Client, ccID string) {
479479
// Try calling unknown function call and expect an error
480-
r, err := client.Execute(channel.Request{ChaincodeID: ccID, Fcn: "DUMMY_FUNCTION", Args: integration.ExampleCCTxArgs()},
480+
r, err := client.Execute(channel.Request{ChaincodeID: ccID, Fcn: "DUMMY_FUNCTION", Args: integration.ExampleCCTxRandomSetArgs()},
481481
channel.WithRetry(retry.DefaultChannelOpts))
482482

483483
t.Logf("testChaincodeError err: %s ***** responses: %v", err, r)
@@ -545,7 +545,7 @@ func TestNoEndpoints(t *testing.T) {
545545
}
546546

547547
// Test execute transaction: since peer has been disabled for endorsement this transaction should fail
548-
_, err = chClient.Execute(channel.Request{ChaincodeID: mainChaincodeID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()},
548+
_, err = chClient.Execute(channel.Request{ChaincodeID: mainChaincodeID, Fcn: "invoke", Args: integration.ExampleCCTxRandomSetArgs()},
549549
channel.WithRetry(retry.DefaultChannelOpts))
550550
if !strings.Contains(err.Error(), expected1_1Err) && !strings.Contains(err.Error(), expected1_2Err) {
551551
t.Fatal("Should have failed due to no chaincode query peers")

test/integration/pkg/client/event/events_client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func testCCEvent(ccID string, chClient *channel.Client, eventClient *event.Clien
9595
}
9696
defer eventClient.Unregister(reg)
9797

98-
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: append(integration.ExampleCCTxArgs(), []byte(eventID))},
98+
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: append(integration.ExampleCCTxRandomSetArgs(), []byte(eventID))},
9999
channel.WithRetry(retry.DefaultChannelOpts))
100100
if err != nil {
101101
t.Fatalf("Failed to move funds: %s", err)
@@ -127,7 +127,7 @@ func testRegisterBlockEvent(ccID string, chClient *channel.Client, eventClient *
127127
}
128128
defer eventClient.Unregister(breg)
129129

130-
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()},
130+
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxRandomSetArgs()},
131131
channel.WithRetry(retry.DefaultChannelOpts))
132132
if err != nil {
133133
t.Fatalf("Failed to move funds: %s", err)
@@ -155,7 +155,7 @@ func testRegisterFilteredBlockEvent(ccID string, chClient *channel.Client, event
155155
}
156156
defer eventClient.Unregister(fbreg)
157157

158-
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()},
158+
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxRandomSetArgs()},
159159
channel.WithRetry(retry.DefaultChannelOpts))
160160
if err != nil {
161161
t.Fatalf("Failed to move funds: %s", err)

test/integration/pkg/fab/eventclient_test.go

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ import (
2424
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
2525
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
2626
"github.com/hyperledger/fabric-sdk-go/test/integration"
27+
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
2728
)
2829

29-
const eventTimeWindow = 120 * time.Second // the maximum amount of time to watch for events.
30+
const eventTimeWindow = 20 * time.Second // the maximum amount of time to watch for events.
3031

3132
func TestEventClient(t *testing.T) {
3233
chainCodeID := mainChaincodeID
@@ -95,7 +96,7 @@ func testEventService(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *f
9596
numExpected++
9697
wg.Add(1)
9798

98-
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)
99+
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID, integration.ExampleCCTxRandomSetArgs())
99100
txReg, txstatusch, err := eventService.RegisterTxStatusEvent(txID)
100101
if err != nil {
101102
t.Fatalf("Error registering for Tx Status event: %s", err)
@@ -127,14 +128,14 @@ func testEventService(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *f
127128
}
128129
}
129130

130-
func sendTxProposal(sdk *fabsdk.FabricSDK, testSetup *integration.BaseSetupImpl, t *testing.T, transactor fab.Transactor, chainCodeID string) ([]*fab.TransactionProposalResponse, *fab.TransactionProposal, string) {
131+
func sendTxProposal(sdk *fabsdk.FabricSDK, testSetup *integration.BaseSetupImpl, t *testing.T, transactor fab.Transactor, chainCodeID string, args [][]byte) ([]*fab.TransactionProposalResponse, *fab.TransactionProposal, string) {
131132
peers, err := getProposalProcessors(sdk, "Admin", testSetup.OrgID, testSetup.Targets)
132133
require.Nil(t, err, "creating peers failed")
133134
tpResponses, prop, err := createAndSendTransactionProposal(
134135
transactor,
135136
chainCodeID,
136137
"invoke",
137-
[][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("10")},
138+
args,
138139
peers,
139140
nil,
140141
)
@@ -162,6 +163,9 @@ func checkTxStatusEvent(wg *sync.WaitGroup, txstatusch <-chan *fab.TxStatusEvent
162163
if txStatus.BlockNumber == 0 {
163164
test.Failf(t, "Expecting non-zero block number")
164165
}
166+
if txStatus.TxValidationCode != pb.TxValidationCode_VALID {
167+
test.Failf(t, "expected transaction validation code to be valid")
168+
}
165169
atomic.AddUint32(numReceived, 1)
166170
case <-time.After(eventTimeWindow):
167171
return
@@ -328,7 +332,7 @@ func testChannelEventsSeekOptions(t *testing.T, testSetup *integration.BaseSetup
328332
defer eventService.Unregister(ccreg)
329333

330334
// prepare and commit the transaction to generate events
331-
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)
335+
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID, integration.ExampleCCTxRandomSetArgs())
332336
_, err = createAndSendTransaction(transactor, prop, tpResponses)
333337
require.NoError(t, err, "First invoke failed err")
334338

@@ -366,3 +370,62 @@ func testChannelEventsSeekOptions(t *testing.T, testSetup *integration.BaseSetup
366370
//to event channel, and first event we get from event channel actually belongs to first transaction after registration.
367371
require.Equal(t, seekType == "", txID == event.TxID, "for seek type[%s], txID [%s], event.txID[%s] ,condition didn't match", seekType, txID, event.TxID)
368372
}
373+
374+
//TestEventClientWithMVCCReadConflicts tests behavior of chaincode events when MVCC_READ_CONFLICT happens
375+
//Chaincode events with Txn Validation Code = MVCC_READ_CONFLICT are not getting published
376+
func TestEventClientWithMVCCReadConflicts(t *testing.T) {
377+
chainCodeID := mainChaincodeID
378+
sdk := mainSDK
379+
testSetup := mainTestSetup
380+
381+
chContextProvider := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name))
382+
chContext, err := chContextProvider()
383+
require.NoError(t, err, "error getting channel context")
384+
385+
eventService, err := chContext.ChannelService().EventService()
386+
require.NoError(t, err, "error getting event service")
387+
388+
testEventServiceWithConflicts(t, testSetup, sdk, chainCodeID, eventService)
389+
}
390+
391+
func testEventServiceWithConflicts(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *fabsdk.FabricSDK, chainCodeID string, eventService fab.EventService) {
392+
_, cancel, transactor, err := getTransactor(sdk, testSetup.ChannelID, "Admin", testSetup.OrgID)
393+
require.NoError(t, err, "Failed to get channel transactor")
394+
defer cancel()
395+
396+
ccreg, cceventch, err := eventService.RegisterChaincodeEvent(chainCodeID, ".*")
397+
require.NoError(t, err, "Error registering for filtered block events")
398+
defer eventService.Unregister(ccreg)
399+
400+
numOfTxns := 4
401+
// Commit multiple transactions to generate events
402+
for i := 0; i < numOfTxns; i++ {
403+
tpResponse, prop, _ := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID, [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("5")})
404+
_, err = createAndSendTransaction(transactor, prop, tpResponse)
405+
require.NoError(t, err, "invoke failed")
406+
}
407+
408+
var numReceived int
409+
test:
410+
for {
411+
select {
412+
case event, ok := <-cceventch:
413+
if !ok {
414+
test.Failf(t, "unexpected closed channel while waiting for CC Status event")
415+
}
416+
t.Logf("Received chaincode event: %#v", event)
417+
require.Equal(t, event.ChaincodeID, chainCodeID)
418+
require.NotEmpty(t, event.SourceURL, "Expecting event source URL but got none")
419+
require.NotEmpty(t, event.BlockNumber, "Expecting non-zero block number")
420+
require.NotEmpty(t, event.TxID, "Expecting valid txID")
421+
require.NotEmpty(t, event.EventName, "Expecting valid event name")
422+
numReceived++
423+
continue
424+
case <-time.After(5 * time.Second):
425+
break test
426+
}
427+
}
428+
429+
require.True(t, numReceived < numOfTxns, "Expected number of transactions to be greater than number of events")
430+
431+
}

0 commit comments

Comments
 (0)