@@ -25,8 +25,12 @@ import (
2525 "regexp"
2626 "sync"
2727
28+ "time"
29+
2830 "github.com/golang/protobuf/proto"
2931 consumer "github.com/hyperledger/fabric-sdk-go/fabric-client/events/consumer"
32+ cnsmr "github.com/hyperledger/fabric/events/consumer"
33+
3034 "github.com/hyperledger/fabric/core/ledger/util"
3135 common "github.com/hyperledger/fabric/protos/common"
3236 pb "github.com/hyperledger/fabric/protos/peer"
@@ -53,6 +57,11 @@ type EventHubExt interface {
5357 AddChaincodeInterest (ChaincodeID string , EventName string )
5458}
5559
60+ // eventClientFactory creates an EventsClient instance
61+ type eventClientFactory interface {
62+ newEventsClient (peerAddress string , certificate string , serverHostOverride string , regTimeout time.Duration , adapter cnsmr.EventAdapter ) (consumer.EventsClient , error )
63+ }
64+
5665type eventHub struct {
5766 // Protects chaincodeRegistrants, blockRegistrants and txRegistrants
5867 mtx sync.RWMutex
@@ -74,6 +83,8 @@ type eventHub struct {
7483 connected bool
7584 // List of events client is interested in
7685 interestedEvents []* pb.Interest
86+ // Factory that creates EventsClient
87+ eventsClientFactory eventClientFactory
7788}
7889
7990// ChaincodeEvent contains the current event data for the event handler
@@ -98,13 +109,26 @@ type ChainCodeCBE struct {
98109 CallbackFunc func (* ChaincodeEvent )
99110}
100111
112+ // consumerClientFactory is the default implementation oif the eventClientFactory
113+ type consumerClientFactory struct {}
114+
115+ func (ccf * consumerClientFactory ) newEventsClient (peerAddress string , certificate string , serverHostOverride string , regTimeout time.Duration , adapter cnsmr.EventAdapter ) (consumer.EventsClient , error ) {
116+ return consumer .NewEventsClient (peerAddress , certificate , serverHostOverride , regTimeout , adapter )
117+ }
118+
101119// NewEventHub ...
102120func NewEventHub () EventHub {
103121 chaincodeRegistrants := make (map [string ][]* ChainCodeCBE )
104122 blockRegistrants := make ([]func (* common.Block ), 0 )
105123 txRegistrants := make (map [string ]func (string , error ))
106124
107- eventHub := & eventHub {chaincodeRegistrants : chaincodeRegistrants , blockRegistrants : blockRegistrants , txRegistrants : txRegistrants , interestedEvents : nil }
125+ eventHub := & eventHub {
126+ chaincodeRegistrants : chaincodeRegistrants ,
127+ blockRegistrants : blockRegistrants ,
128+ txRegistrants : txRegistrants ,
129+ interestedEvents : nil ,
130+ eventsClientFactory : & consumerClientFactory {},
131+ }
108132
109133 // default to listening for block events
110134 eventHub .SetInterests (true )
@@ -177,7 +201,7 @@ func (eventHub *eventHub) Connect() error {
177201 eventHub .blockRegistrants = make ([]func (* common.Block ), 0 )
178202 eventHub .blockRegistrants = append (eventHub .blockRegistrants , eventHub .txCallback )
179203
180- eventsClient , _ := consumer . NewEventsClient (eventHub .peerAddr , eventHub .peerTLSCertificate , eventHub .peerTLSServerHostOverride , 5 , eventHub )
204+ eventsClient , _ := eventHub . eventsClientFactory . newEventsClient (eventHub .peerAddr , eventHub .peerTLSCertificate , eventHub .peerTLSServerHostOverride , 5 , eventHub )
181205 if err := eventsClient .Start (); err != nil {
182206 eventsClient .Stop ()
183207 return fmt .Errorf ("Error from eventsClient.Start (%s)" , err .Error ())
@@ -195,14 +219,11 @@ func (eventHub *eventHub) GetInterestedEvents() ([]*pb.Interest, error) {
195219
196220//Recv implements consumer.EventAdapter interface for receiving events
197221func (eventHub * eventHub ) Recv (msg * pb.Event ) (bool , error ) {
198- eventHub .mtx .RLock ()
199- defer eventHub .mtx .RUnlock ()
200-
201222 switch msg .Event .(type ) {
202223 case * pb.Event_Block :
203224 blockEvent := msg .Event .(* pb.Event_Block )
204225 logger .Debugf ("Recv blockEvent:%v\n " , blockEvent )
205- for _ , v := range eventHub .blockRegistrants {
226+ for _ , v := range eventHub .getBlockRegistrants () {
206227 v (blockEvent .Block )
207228 }
208229
@@ -282,15 +303,18 @@ func (eventHub *eventHub) UnregisterChaincodeEvent(cbe *ChainCodeCBE) {
282303 logger .Debugf ("No event registration for ccid %s \n " , cbe .CCID )
283304 return
284305 }
306+
285307 for i , v := range cbeArray {
286- if v .EventNameFilter == cbe .EventNameFilter {
287- cbeArray = append (cbeArray [:i ], cbeArray [i + 1 :]... )
308+ if v == cbe {
309+ newCbeArray := append (cbeArray [:i ], cbeArray [i + 1 :]... )
310+ if len (newCbeArray ) <= 0 {
311+ delete (eventHub .chaincodeRegistrants , cbe .CCID )
312+ } else {
313+ eventHub .chaincodeRegistrants [cbe .CCID ] = newCbeArray
314+ }
315+ break
288316 }
289317 }
290- if len (cbeArray ) <= 0 {
291- delete (eventHub .chaincodeRegistrants , cbe .CCID )
292- }
293-
294318}
295319
296320// RegisterTxEvent ...
@@ -330,39 +354,77 @@ func (eventHub *eventHub) UnregisterTxEvent(txID string) {
330354func (eventHub * eventHub ) txCallback (block * common.Block ) {
331355 logger .Debugf ("txCallback block=%v\n " , block )
332356
333- eventHub .mtx .RLock ()
334- defer eventHub .mtx .RUnlock ()
335357 txFilter := util .TxValidationFlags (block .Metadata .Metadata [common .BlockMetadataIndex_TRANSACTIONS_FILTER ])
336358 for i , v := range block .Data .Data {
337359 if env , err := utils .GetEnvelopeFromBlock (v ); err != nil {
360+ logger .Errorf ("error extracting Envelope from block: %v\n " , err )
338361 return
339362 } else if env != nil {
340363 // get the payload from the envelope
341364 payload , err := utils .GetPayload (env )
342365 if err != nil {
366+ logger .Errorf ("error extracting Payload from envelope: %v\n " , err )
343367 return
344368 }
345369
346370 channelHeaderBytes := payload .Header .ChannelHeader
347371 channelHeader := & common.ChannelHeader {}
348372 err = proto .Unmarshal (channelHeaderBytes , channelHeader )
349373 if err != nil {
374+ logger .Errorf ("error extracting ChannelHeader from payload: %v\n " , err )
350375 return
351376 }
352377
353- callback := eventHub .txRegistrants [ channelHeader .TxId ]
378+ callback := eventHub .getTXRegistrant ( channelHeader .TxId )
354379 if callback != nil {
355380 if txFilter .IsInvalid (i ) {
356381 callback (channelHeader .TxId , fmt .Errorf ("Received invalid transaction from channel %s\n " , channelHeader .ChannelId ))
357382
358383 } else {
359384 callback (channelHeader .TxId , nil )
360385 }
386+ } else {
387+ logger .Debugf ("No callback registered for TxID: %s\n " , channelHeader .TxId )
361388 }
362389 }
363390 }
364391}
365392
393+ func (eventHub * eventHub ) getBlockRegistrants () []func (* common.Block ) {
394+ eventHub .mtx .RLock ()
395+ defer eventHub .mtx .RUnlock ()
396+
397+ // Return a clone of the array to avoid race conditions
398+ clone := make ([]func (* common.Block ), len (eventHub .blockRegistrants ))
399+ for i , registrant := range eventHub .blockRegistrants {
400+ clone [i ] = registrant
401+ }
402+ return clone
403+ }
404+
405+ func (eventHub * eventHub ) getChaincodeRegistrants (chaincodeID string ) []* ChainCodeCBE {
406+ eventHub .mtx .RLock ()
407+ defer eventHub .mtx .RUnlock ()
408+
409+ registrants , ok := eventHub .chaincodeRegistrants [chaincodeID ]
410+ if ! ok {
411+ return nil
412+ }
413+
414+ // Return a clone of the array to avoid race conditions
415+ clone := make ([]* ChainCodeCBE , len (registrants ))
416+ for i , registrants := range registrants {
417+ clone [i ] = registrants
418+ }
419+ return clone
420+ }
421+
422+ func (eventHub * eventHub ) getTXRegistrant (txID string ) func (string , error ) {
423+ eventHub .mtx .RLock ()
424+ defer eventHub .mtx .RUnlock ()
425+ return eventHub .txRegistrants [txID ]
426+ }
427+
366428// getChainCodeEvents parses block events for chaincode events associated with individual transactions
367429func getChainCodeEvent (tdata []byte ) (* pb.ChaincodeEvent , error ) {
368430
@@ -417,7 +479,7 @@ func getChainCodeEvent(tdata []byte) (*pb.ChaincodeEvent, error) {
417479// Utility function to fire callbacks for chaincode registrants
418480func (eventHub * eventHub ) notifyChaincodeRegistrants (ccEvent * pb.ChaincodeEvent , patternMatch bool ) {
419481
420- cbeArray := eventHub .chaincodeRegistrants [ ccEvent .ChaincodeId ]
482+ cbeArray := eventHub .getChaincodeRegistrants ( ccEvent .ChaincodeId )
421483 if len (cbeArray ) <= 0 {
422484 logger .Debugf ("No event registration for ccid %s \n " , ccEvent .ChaincodeId )
423485 }
0 commit comments