Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 0c63fda

Browse files
committed
[FAB-8756] Remove Event Hub
- Removed all references to the old EventHub - Added an EventClient cache to InfraProvider - Added reusable caching constructs: - LazyReference - LazyCache - FutureValue Change-Id: I63b630bd4c339a88a263bc65b638b4b440b9db78 Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
1 parent c9bd65a commit 0c63fda

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2042
-946
lines changed

pkg/client/channel/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
)
1616

1717
// CCEvent contains the data for a chaincocde event
18+
// Deprecated since EventHub is replaced with EventService
1819
type CCEvent struct {
1920
TxID string
2021
ChaincodeID string
@@ -24,6 +25,7 @@ type CCEvent struct {
2425

2526
// Registration is a handle that is returned from a successful Register Chaincode Event.
2627
// This handle should be used in Unregister in order to unregister the event.
28+
// Deprecated since EventHub is replaced with EventService
2729
type Registration interface {
2830
}
2931

pkg/client/channel/chclient.go

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ SPDX-License-Identifier: Apache-2.0
88
package channel
99

1010
import (
11-
"reflect"
1211
"time"
1312

1413
"github.com/hyperledger/fabric-sdk-go/pkg/client/channel/invoke"
@@ -39,7 +38,7 @@ type Client struct {
3938
context context.Channel
4039
membership fab.ChannelMembership
4140
transactor fab.Transactor
42-
eventHub fab.EventHub
41+
eventService fab.EventService
4342
greylist *greylist.Filter
4443
discoveryFilter fab.TargetFilter
4544
}
@@ -78,9 +77,9 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
7877
return nil, errors.New("channel service not initialized")
7978
}
8079

81-
eventHub, err := channelContext.ChannelService().EventHub()
80+
eventService, err := channelContext.ChannelService().EventService()
8281
if err != nil {
83-
return nil, errors.WithMessage(err, "event hub creation failed")
82+
return nil, errors.WithMessage(err, "event service creation failed")
8483
}
8584

8685
transactor, err := channelContext.ChannelService().Transactor()
@@ -94,10 +93,10 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
9493
}
9594

9695
channelClient := Client{
97-
membership: membership,
98-
transactor: transactor,
99-
eventHub: eventHub,
100-
greylist: greylistProvider,
96+
membership: membership,
97+
transactor: transactor,
98+
eventService: eventService,
99+
greylist: greylistProvider,
101100
}
102101

103102
for _, param := range opts {
@@ -189,11 +188,11 @@ func (cc *Client) prepareHandlerContexts(request Request, o opts) (*invoke.Reque
189188
}
190189

191190
clientContext := &invoke.ClientContext{
192-
Selection: cc.context.SelectionService(),
193-
Discovery: cc.context.DiscoveryService(),
194-
Membership: cc.membership,
195-
Transactor: cc.transactor,
196-
EventHub: cc.eventHub,
191+
Selection: cc.context.SelectionService(),
192+
Discovery: cc.context.DiscoveryService(),
193+
Membership: cc.membership,
194+
Transactor: cc.transactor,
195+
EventService: cc.eventService,
197196
}
198197

199198
requestContext := &invoke.RequestContext{
@@ -240,45 +239,23 @@ func (cc *Client) addDefaultTimeout(timeOutType core.TimeoutType, options ...Opt
240239
return options
241240
}
242241

243-
// Close releases channel client resources (disconnects event hub etc.)
242+
// Close ...
243+
// TODO: This function should probably be deprecated since all
244+
// resources (including caches) are on the providers and will
245+
// be freed when Close() is called on the SDK.
244246
func (cc *Client) Close() error {
245-
if cc.eventHub.IsConnected() == true {
246-
return cc.eventHub.Disconnect()
247-
}
248-
249247
return nil
250248
}
251249

252250
// RegisterChaincodeEvent registers chain code event
253251
// @param {chan bool} channel which receives event details when the event is complete
254252
// @returns {object} object handle that should be used to unregister
255-
func (cc *Client) RegisterChaincodeEvent(notify chan<- *CCEvent, chainCodeID string, eventID string) (Registration, error) {
256-
257-
if cc.eventHub.IsConnected() == false {
258-
if err := cc.eventHub.Connect(); err != nil {
259-
return nil, errors.WithMessage(err, "Event hub failed to connect")
260-
}
261-
}
262-
253+
func (cc *Client) RegisterChaincodeEvent(chainCodeID string, eventFilter string) (fab.Registration, <-chan *fab.CCEvent, error) {
263254
// Register callback for CE
264-
rce := cc.eventHub.RegisterChaincodeEvent(chainCodeID, eventID, func(ce *fab.ChaincodeEvent) {
265-
notify <- &CCEvent{ChaincodeID: ce.ChaincodeID, EventName: ce.EventName, TxID: ce.TxID, Payload: ce.Payload}
266-
})
267-
268-
return rce, nil
255+
return cc.eventService.RegisterChaincodeEvent(chainCodeID, eventFilter)
269256
}
270257

271258
// UnregisterChaincodeEvent removes chain code event registration
272-
func (cc *Client) UnregisterChaincodeEvent(registration Registration) error {
273-
274-
switch regType := registration.(type) {
275-
276-
case *fab.ChainCodeCBE:
277-
cc.eventHub.UnregisterChaincodeEvent(regType)
278-
default:
279-
return errors.Errorf("Unsupported registration type: %v", reflect.TypeOf(registration))
280-
}
281-
282-
return nil
283-
259+
func (cc *Client) UnregisterChaincodeEvent(registration fab.Registration) {
260+
cc.eventService.Unregister(registration)
284261
}

pkg/client/channel/chclient_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func TestOrdererStatusError(t *testing.T) {
353353
testOrderer1 := fcmocks.NewMockOrderer("", make(chan *fab.SignedEnvelope))
354354
orderers := []fab.Orderer{testOrderer1}
355355
chClient := setupChannelClientWithNodes(peers, orderers, t)
356-
chClient.eventHub = fcmocks.NewMockEventHub()
356+
chClient.eventService = fcmocks.NewMockEventService()
357357

358358
testOrderer1.EnqueueSendBroadcastError(status.New(status.OrdererClientStatus,
359359
status.ConnectionFailed.ToInt32(), testErrorMessage, nil))
@@ -371,22 +371,21 @@ func TestOrdererStatusError(t *testing.T) {
371371

372372
func TestTransactionValidationError(t *testing.T) {
373373
validationCode := pb.TxValidationCode_BAD_RWSET
374-
mockEventHub := fcmocks.NewMockEventHub()
374+
mockEventService := fcmocks.NewMockEventService()
375375
testPeer1 := fcmocks.NewMockPeer("Peer1", "http://peer1.com")
376376
peers := []fab.Peer{testPeer1}
377377

378378
go func() {
379379
select {
380-
case callback := <-mockEventHub.RegisteredTxCallbacks:
381-
callback("txid", validationCode,
382-
status.New(status.EventServerStatus, int32(validationCode), "test", nil))
380+
case txStatusReg := <-mockEventService.TxStatusRegCh:
381+
txStatusReg.Eventch <- &fab.TxStatusEvent{TxID: txStatusReg.TxID, TxValidationCode: validationCode}
383382
case <-time.After(time.Second * 5):
384383
t.Fatal("Timed out waiting for execute Tx to register event callback")
385384
}
386385
}()
387386

388387
chClient := setupChannelClient(peers, t)
389-
chClient.eventHub = mockEventHub
388+
chClient.eventService = mockEventService
390389
response, err := chClient.Execute(Request{ChaincodeID: "test", Fcn: "invoke",
391390
Args: [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("1")}})
392391
assert.Nil(t, response.Payload, "Expected nil result on failed execute operation")

pkg/client/channel/invoke/api.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ type Handler interface {
4747

4848
//ClientContext contains context parameters for handler execution
4949
type ClientContext struct {
50-
CryptoSuite core.CryptoSuite
51-
Discovery fab.DiscoveryService
52-
Selection fab.SelectionService
53-
Membership fab.ChannelMembership
54-
Transactor fab.Transactor
55-
EventHub fab.EventHub
50+
CryptoSuite core.CryptoSuite
51+
Discovery fab.DiscoveryService
52+
Selection fab.SelectionService
53+
Membership fab.ChannelMembership
54+
Transactor fab.Transactor
55+
EventService fab.EventService
5656
}
5757

5858
//RequestContext contains request, opts, response parameters for handler execution

pkg/client/channel/invoke/txnhandler.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
"bytes"
1111
"time"
1212

13+
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
1314
"github.com/pkg/errors"
1415

1516
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
16-
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
1717
"github.com/hyperledger/fabric-sdk-go/pkg/fab/peer"
1818
"github.com/hyperledger/fabric-sdk-go/pkg/fab/txn"
1919
"github.com/hyperledger/fabric-sdk-go/pkg/logging"
2020
"github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
21+
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
2122
)
2223

2324
var logger = logging.NewLogger("fabsdk/client")
@@ -138,32 +139,28 @@ type CommitTxHandler struct {
138139

139140
//Handle handles commit tx
140141
func (c *CommitTxHandler) Handle(requestContext *RequestContext, clientContext *ClientContext) {
141-
142-
//Connect to Event hub if not yet connected
143-
if clientContext.EventHub.IsConnected() == false {
144-
err := clientContext.EventHub.Connect()
145-
if err != nil {
146-
requestContext.Error = err
147-
return
148-
}
149-
}
150-
151142
txnID := requestContext.Response.TransactionID
152143

153144
//Register Tx event
154-
statusNotifier := txn.RegisterStatus(txnID, clientContext.EventHub)
155-
_, err := createAndSendTransaction(clientContext.Transactor, requestContext.Response.Proposal, requestContext.Response.Responses)
145+
reg, statusNotifier, err := clientContext.EventService.RegisterTxStatusEvent(string(txnID)) // TODO: Change func to use TransactionID instead of string
146+
if err != nil {
147+
requestContext.Error = errors.Wrap(err, "error registering for TxStatus event")
148+
return
149+
}
150+
defer clientContext.EventService.Unregister(reg)
151+
152+
_, err = createAndSendTransaction(clientContext.Transactor, requestContext.Response.Proposal, requestContext.Response.Responses)
156153
if err != nil {
157154
requestContext.Error = errors.Wrap(err, "CreateAndSendTransaction failed")
158155
return
159156
}
160157

161158
select {
162-
case result := <-statusNotifier:
163-
requestContext.Response.TxValidationCode = result.Code
159+
case txStatus := <-statusNotifier:
160+
requestContext.Response.TxValidationCode = txStatus.TxValidationCode
164161

165-
if result.Error != nil {
166-
requestContext.Error = result.Error
162+
if txStatus.TxValidationCode != pb.TxValidationCode_VALID {
163+
requestContext.Error = status.New(status.EventServerStatus, int32(txStatus.TxValidationCode), "received invalid transaction", nil)
167164
return
168165
}
169166
case <-time.After(requestContext.Opts.Timeout):

pkg/client/channel/invoke/txnhandler_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
1919
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
2020
fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
21+
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
2122
)
2223

2324
const (
@@ -65,13 +66,13 @@ func TestExecuteTxHandlerSuccess(t *testing.T) {
6566
clientContext := setupChannelClientContext(nil, nil, []fab.Peer{mockPeer1, mockPeer2}, t)
6667

6768
//Prepare mock eventhub
68-
mockEventHub := fcmocks.NewMockEventHub()
69-
clientContext.EventHub = mockEventHub
69+
mockEventService := fcmocks.NewMockEventService()
70+
clientContext.EventService = mockEventService
7071

7172
go func() {
7273
select {
73-
case callback := <-mockEventHub.RegisteredTxCallbacks:
74-
callback("txid", 0, nil)
74+
case txStatusReg := <-mockEventService.TxStatusRegCh:
75+
txStatusReg.Eventch <- &fab.TxStatusEvent{TxID: txStatusReg.TxID, TxValidationCode: pb.TxValidationCode_VALID}
7576
case <-time.After(requestContext.Opts.Timeout):
7677
t.Fatal("Execute handler : time out not expected")
7778
}

pkg/client/resmgmt/resmgmt.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
2222
contextImpl "github.com/hyperledger/fabric-sdk-go/pkg/context"
2323
"github.com/hyperledger/fabric-sdk-go/pkg/errors/multi"
24+
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
2425
"github.com/hyperledger/fabric-sdk-go/pkg/fab/orderer"
2526
"github.com/hyperledger/fabric-sdk-go/pkg/fab/peer"
2627
"github.com/hyperledger/fabric-sdk-go/pkg/fab/resource"
@@ -506,20 +507,17 @@ func (rc *Client) sendCCProposal(ccProposalType chaincodeProposalType, channelID
506507
return errors.WithMessage(err, "sending deploy transaction proposal failed")
507508
}
508509

509-
eventHub, err := channelService.EventHub()
510+
eventService, err := channelService.EventService()
510511
if err != nil {
511-
return errors.WithMessage(err, "Unable to get EventHub")
512-
}
513-
if eventHub.IsConnected() == false {
514-
err := eventHub.Connect()
515-
if err != nil {
516-
return err
517-
}
518-
defer eventHub.Disconnect()
512+
return errors.WithMessage(err, "unable to get event service")
519513
}
520514

521515
// Register for commit event
522-
statusNotifier := txn.RegisterStatus(tp.TxnID, eventHub)
516+
reg, statusNotifier, err := eventService.RegisterTxStatusEvent(string(tp.TxnID))
517+
if err != nil {
518+
return errors.WithMessage(err, "error registering for TxStatus event")
519+
}
520+
defer eventService.Unregister(reg)
523521

524522
transactionRequest := fab.TransactionRequest{
525523
Proposal: tp,
@@ -535,11 +533,11 @@ func (rc *Client) sendCCProposal(ccProposalType chaincodeProposalType, channelID
535533
}
536534

537535
select {
538-
case result := <-statusNotifier:
539-
if result.Error == nil {
536+
case txStatus := <-statusNotifier:
537+
if txStatus.TxValidationCode == pb.TxValidationCode_VALID {
540538
return nil
541539
}
542-
return errors.WithMessage(result.Error, "instantiateOrUpgradeCC failed")
540+
return status.New(status.EventServerStatus, int32(txStatus.TxValidationCode), "instantiateOrUpgradeCC failed", nil)
543541
case <-time.After(timeout):
544542
return errors.New("instantiateOrUpgradeCC timeout")
545543
}

0 commit comments

Comments
 (0)