Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 13e434c

Browse files
troyrondaGerrit Code Review
authored andcommitted
Merge "[FAB-9036] Source URL and block num in events"
2 parents a3b4280 + 1b6dbd4 commit 13e434c

28 files changed

+436
-207
lines changed

pkg/common/providers/fab/eventservice.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@ import (
1515
type BlockEvent struct {
1616
// Block is the block that was committed
1717
Block *cb.Block
18+
// SourceURL specifies the URL of the peer that produced the event
19+
SourceURL string
1820
}
1921

2022
// FilteredBlockEvent contains the data for a filtered block event
2123
type FilteredBlockEvent struct {
2224
// FilteredBlock contains a filtered version of the block that was committed
2325
FilteredBlock *pb.FilteredBlock
26+
// SourceURL specifies the URL of the peer that produced the event
27+
SourceURL string
2428
}
2529

2630
// TxStatusEvent contains the data for a transaction status event
@@ -29,6 +33,11 @@ type TxStatusEvent struct {
2933
TxID string
3034
// TxValidationCode is the status code of the commit
3135
TxValidationCode pb.TxValidationCode
36+
// BlockNumber contains the block number in which the
37+
// transaction was committed
38+
BlockNumber uint64
39+
// SourceURL specifies the URL of the peer that produced the event
40+
SourceURL string
3241
}
3342

3443
// CCEvent contains the data for a chaincode event
@@ -42,6 +51,11 @@ type CCEvent struct {
4251
// Payload contains the payload of the chaincode event
4352
// NOTE: Payload will be nil for filtered events
4453
Payload []byte
54+
// BlockNumber contains the block number in which the
55+
// chaincode event was committed
56+
BlockNumber uint64
57+
// SourceURL specifies the URL of the peer that produced the event
58+
SourceURL string
4559
}
4660

4761
// Registration is a handle that is returned from a successful RegisterXXXEvent.

pkg/fab/events/client/client_test.go

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ const (
3939
var (
4040
peer1 = fabmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051")
4141
peer2 = fabmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051")
42+
43+
sourceURL = "localhost:9051"
4244
)
4345

4446
func TestConnect(t *testing.T) {
4547
connectionProvider := clientmocks.NewProviderFactory().Provider(
4648
clientmocks.NewMockConnection(
47-
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
49+
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
4850
),
4951
)
5052

@@ -90,7 +92,7 @@ func TestFailConnect(t *testing.T) {
9092
fabmocks.NewMockChannelCfg("mychannel"),
9193
mockconn.NewProviderFactory().Provider(
9294
mockconn.NewMockConnection(
93-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
95+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
9496
),
9597
),
9698
failAfterConnectClientProvider, []options.Opt{},
@@ -111,7 +113,7 @@ func TestCallsOnClosedClient(t *testing.T) {
111113
),
112114
fabmocks.NewMockChannelCfg("mychannel"),
113115
filteredClientProvider,
114-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
116+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
115117
)
116118
if err != nil {
117119
t.Fatalf("error creating channel event client: %s", err)
@@ -153,7 +155,7 @@ func TestCloseIfIdle(t *testing.T) {
153155
),
154156
fabmocks.NewMockChannelCfg(channelID),
155157
clientProvider,
156-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
158+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
157159
)
158160
if err != nil {
159161
t.Fatalf("error creating channel event client: %s", err)
@@ -187,7 +189,7 @@ func TestInvalidUnregister(t *testing.T) {
187189
),
188190
fabmocks.NewMockChannelCfg(channelID),
189191
filteredClientProvider,
190-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
192+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
191193
)
192194
if err != nil {
193195
t.Fatalf("error creating channel event client: %s", err)
@@ -210,7 +212,7 @@ func TestUnauthorizedBlockEvents(t *testing.T) {
210212
),
211213
fabmocks.NewMockChannelCfg(channelID),
212214
filteredClientProvider,
213-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
215+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
214216
)
215217
if err != nil {
216218
t.Fatalf("error creating channel event client: %s", err)
@@ -234,7 +236,7 @@ func TestBlockEvents(t *testing.T) {
234236
),
235237
fabmocks.NewMockChannelCfg(channelID),
236238
clientProvider,
237-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
239+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
238240
)
239241
if err != nil {
240242
t.Fatalf("error creating channel event client: %s", err)
@@ -292,7 +294,7 @@ func TestFilteredBlockEvents(t *testing.T) {
292294
),
293295
fabmocks.NewMockChannelCfg(channelID),
294296
filteredClientProvider,
295-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
297+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
296298
)
297299
if err != nil {
298300
t.Fatalf("error creating channel event client: %s", err)
@@ -369,7 +371,7 @@ func TestBlockAndFilteredBlockEvents(t *testing.T) {
369371
),
370372
fabmocks.NewMockChannelCfg(channelID),
371373
clientProvider,
372-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
374+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
373375
)
374376
if err != nil {
375377
t.Fatalf("error creating channel event client: %s", err)
@@ -447,7 +449,7 @@ func TestTxStatusEvents(t *testing.T) {
447449
),
448450
fabmocks.NewMockChannelCfg(channelID),
449451
filteredClientProvider,
450-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
452+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
451453
)
452454
if err != nil {
453455
t.Fatalf("error creating channel event client: %s", err)
@@ -531,7 +533,7 @@ func TestCCEvents(t *testing.T) {
531533
),
532534
fabmocks.NewMockChannelCfg(channelID),
533535
filteredClientProvider,
534-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
536+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
535537
)
536538
if err != nil {
537539
t.Fatalf("error creating channel event client: %s", err)
@@ -731,7 +733,7 @@ func TestConcurrentEvents(t *testing.T) {
731733
[]options.Opt{
732734
esdispatcher.WithEventConsumerBufferSize(uint(numEvents) * 4),
733735
},
734-
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
736+
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
735737
)
736738
if err != nil {
737739
t.Fatalf("error creating channel event client: %s", err)
@@ -885,14 +887,19 @@ func listenFilteredBlockEvents(channelID string, eventch <-chan *fab.FilteredBlo
885887

886888
func listenChaincodeEvents(channelID string, eventch <-chan *fab.CCEvent, expected int, errch chan<- error) {
887889
numReceived := 0
890+
lastBlockNum := uint64(0)
888891

889892
for {
890893
select {
891-
case _, ok := <-eventch:
894+
case event, ok := <-eventch:
892895
if !ok {
893896
fmt.Printf("CC events channel was closed \n")
894897
return
895898
}
899+
if event.BlockNumber > 0 && event.BlockNumber <= lastBlockNum {
900+
errch <- errors.Errorf("Expected block greater than [%d] but received [%d]", lastBlockNum, event.BlockNumber)
901+
return
902+
}
896903
numReceived++
897904
case <-time.After(5 * time.Second):
898905
if numReceived != expected {
@@ -919,7 +926,7 @@ func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID str
919926
var receivedEvents int
920927

921928
for i := 0; i < expected; i++ {
922-
txID := fmt.Sprintf("txid_tx_%d", i)
929+
txID := fmt.Sprintf("TxID_%d", i)
923930
go func() {
924931
defer wg.Done()
925932

@@ -932,18 +939,22 @@ func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID str
932939
}
933940
defer eventClient.Unregister(reg)
934941

935-
ledger.NewBlock(channelID,
942+
block := ledger.NewBlock(channelID,
936943
servicemocks.NewTransactionWithCCEvent(txID, pb.TxValidationCode_VALID, ccID, event1, payload1),
937944
)
938945

939946
select {
940-
case _, ok := <-txeventch:
947+
case event, ok := <-txeventch:
941948
mutex.Lock()
942949
if !ok {
943950
errs = append(errs, errors.New("unexpected closed channel"))
944951
} else {
945952
receivedEvents++
946953
}
954+
if event.BlockNumber != block.Number() {
955+
errch <- errors.Errorf("Expected block number [%d] but received [%d]", block.Number(), event.BlockNumber)
956+
return
957+
}
947958
mutex.Unlock()
948959
case <-time.After(5 * time.Second):
949960
mutex.Lock()
@@ -971,7 +982,7 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome mockconn
971982
clientmocks.NewDiscoveryProvider(peer1, peer2),
972983
),
973984
fabmocks.NewMockChannelCfg("mychannel"),
974-
cp.FlakeyProvider(connAttemptResult, mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory))),
985+
cp.FlakeyProvider(connAttemptResult, mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL))),
975986
clientProvider,
976987
[]options.Opt{
977988
esdispatcher.WithEventConsumerTimeout(time.Second),
@@ -1000,7 +1011,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
10001011

10011012
connectch := make(chan *dispatcher.ConnectionEvent)
10021013

1003-
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
1014+
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)
10041015

10051016
eventClient, _, err := newClientWithMockConnAndOpts(
10061017
fabmocks.NewMockContextWithCustomDiscovery(
@@ -1055,7 +1066,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents mockconn.NumBlo
10551066
channelID := "mychannel"
10561067
ccID := "mycc"
10571068

1058-
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
1069+
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)
10591070

10601071
cp := mockconn.NewProviderFactory()
10611072

pkg/fab/events/client/dispatcher/dispatcher_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
var (
2525
peer1 = fabmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051")
2626
peer2 = fabmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051")
27+
28+
sourceURL = "localhost:9051"
2729
)
2830

2931
func TestConnect(t *testing.T) {
@@ -38,7 +40,7 @@ func TestConnect(t *testing.T) {
3840
clientmocks.NewProviderFactory().Provider(
3941
clientmocks.NewMockConnection(
4042
clientmocks.WithLedger(
41-
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory),
43+
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL),
4244
),
4345
),
4446
),
@@ -117,7 +119,7 @@ func TestConnectNoPeers(t *testing.T) {
117119
clientmocks.NewProviderFactory().Provider(
118120
clientmocks.NewMockConnection(
119121
clientmocks.WithLedger(
120-
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory),
122+
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL),
121123
),
122124
),
123125
),
@@ -160,7 +162,7 @@ func TestConnectionEvent(t *testing.T) {
160162
clientmocks.NewProviderFactory().Provider(
161163
clientmocks.NewMockConnection(
162164
clientmocks.WithLedger(
163-
servicemocks.NewMockLedger(servicemocks.BlockEventFactory),
165+
servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL),
164166
),
165167
),
166168
),

pkg/fab/events/client/mocks/mockconnection.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,15 @@ type MockConnection struct {
9090
producerch <-chan interface{}
9191
rcvch chan interface{}
9292
closed int32
93+
sourceURL string
9394
}
9495

9596
// Opts contains mock connection options
9697
type Opts struct {
9798
Ledger servicemocks.Ledger
9899
Operations OperationMap
99100
Factory ConnectionFactory
101+
SourceURL string
100102
}
101103

102104
// NewMockConnection returns a new MockConnection using the given options
@@ -115,13 +117,19 @@ func NewMockConnection(opts ...Opt) *MockConnection {
115117
panic("ledger is nil")
116118
}
117119

120+
sourceURL := copts.SourceURL
121+
if sourceURL == "" {
122+
sourceURL = "localhost:9051"
123+
}
124+
118125
producer := servicemocks.NewMockProducer(copts.Ledger)
119126

120127
c := &MockConnection{
121128
producer: producer,
122129
producerch: producer.Register(),
123130
rcvch: make(chan interface{}),
124131
operations: operations,
132+
sourceURL: sourceURL,
125133
}
126134
return c
127135
}
@@ -178,6 +186,11 @@ func (c *MockConnection) Ledger() servicemocks.Ledger {
178186
return c.producer.Ledger()
179187
}
180188

189+
// SourceURL returns the event source
190+
func (c *MockConnection) SourceURL() string {
191+
return c.sourceURL
192+
}
193+
181194
// ProviderFactory creates various mock MockConnection Providers
182195
type ProviderFactory struct {
183196
connection Connection
@@ -289,6 +302,13 @@ func NewResult(operation Operation, result Result, errMsg ...string) *OperationR
289302
}
290303
}
291304

305+
// WithSourceURL provides the mock connection with an event source
306+
func WithSourceURL(sourceURL string) Opt {
307+
return func(opts *Opts) {
308+
opts.SourceURL = sourceURL
309+
}
310+
}
311+
292312
// WithLedger provides the mock connection with a ledger
293313
func WithLedger(ledger servicemocks.Ledger) Opt {
294314
return func(opts *Opts) {
@@ -318,3 +338,12 @@ func newDeliverStatusResponse(status cb.Status) *pb.DeliverResponse_Status {
318338
Status: status,
319339
}
320340
}
341+
342+
type eventSource struct {
343+
url string
344+
}
345+
346+
// URL returns the URL of the peer that published the event
347+
func (es *eventSource) URL() string {
348+
return es.url
349+
}

pkg/fab/events/deliverclient/connection/connection.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type deliverStream interface {
4040
// DeliverConnection manages the connection to the deliver server
4141
type DeliverConnection struct {
4242
comm.GRPCConnection
43+
url string
4344
}
4445

4546
// StreamProvider creates a deliver stream
@@ -73,6 +74,7 @@ func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamPr
7374

7475
return &DeliverConnection{
7576
GRPCConnection: *connect,
77+
url: url,
7678
}, nil
7779
}
7880

@@ -133,7 +135,7 @@ func (c *DeliverConnection) Receive(eventch chan<- interface{}) {
133135
break
134136
}
135137

136-
eventch <- in
138+
eventch <- NewEvent(in, c.url)
137139
}
138140
logger.Debugf("Exiting stream listener")
139141
}
@@ -178,3 +180,17 @@ func (c *DeliverConnection) createSignedEnvelope(msg proto.Message) (*cb.Envelop
178180

179181
return &cb.Envelope{Payload: paylBytes, Signature: signature}, nil
180182
}
183+
184+
// Event contains the deliver event as well as the event source
185+
type Event struct {
186+
SourceURL string
187+
Event interface{}
188+
}
189+
190+
// NewEvent returns a deliver event
191+
func NewEvent(event interface{}, sourceURL string) *Event {
192+
return &Event{
193+
SourceURL: sourceURL,
194+
Event: event,
195+
}
196+
}

0 commit comments

Comments
 (0)