@@ -27,11 +27,14 @@ import (
2727 config "github.com/hyperledger/fabric-sdk-go/config"
2828 consumer "github.com/hyperledger/fabric/events/consumer"
2929 ehpb "github.com/hyperledger/fabric/protos/peer"
30+ logging "github.com/op/go-logging"
3031 "golang.org/x/net/context"
3132 "google.golang.org/grpc"
3233 "google.golang.org/grpc/credentials"
3334)
3435
36+ var logger = logging .MustGetLogger ("fabric_sdk_go" )
37+
3538const defaultTimeout = time .Second * 3
3639
3740//EventsClient holds the stream and adapter for consumer to work with
@@ -51,6 +54,7 @@ type eventsClient struct {
5154 adapter consumer.EventAdapter
5255 TLSCertificate string
5356 TLSServerHostOverride string
57+ clientConn * grpc.ClientConn
5458}
5559
5660//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
@@ -63,7 +67,7 @@ func NewEventsClient(peerAddress string, certificate string, serverhostoverride
6367 regTimeout = 60 * time .Second
6468 err = fmt .Errorf ("regTimeout > 60, setting to 60 sec" )
6569 }
66- return & eventsClient {sync.RWMutex {}, peerAddress , regTimeout , nil , adapter , certificate , serverhostoverride }, err
70+ return & eventsClient {sync.RWMutex {}, peerAddress , regTimeout , nil , adapter , certificate , serverhostoverride , nil }, err
6771}
6872
6973//newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
@@ -140,7 +144,7 @@ func (ec *eventsClient) UnregisterAsync(ies []*ehpb.Interest) error {
140144 emsg := & ehpb.Event {Event : & ehpb.Event_Unregister {Unregister : & ehpb.Unregister {Events : ies }}}
141145 var err error
142146 if err = ec .send (emsg ); err != nil {
143- err = fmt .Errorf ("error on unregister send %s\n " , err )
147+ err = fmt .Errorf ("error on unregister send %s" , err )
144148 }
145149
146150 return err
@@ -181,7 +185,7 @@ func (ec *eventsClient) unregister(ies []*ehpb.Interest) error {
181185func (ec * eventsClient ) Recv () (* ehpb.Event , error ) {
182186 in , err := ec .stream .Recv ()
183187 if err == io .EOF {
184- // read done.
188+ // read done
185189 if ec .adapter != nil {
186190 ec .adapter .Disconnected (nil )
187191 }
@@ -227,6 +231,7 @@ func (ec *eventsClient) Start() error {
227231 if err != nil {
228232 return fmt .Errorf ("Could not create client conn to %s (%v)" , ec .peerAddress , err )
229233 }
234+ ec .clientConn = conn
230235
231236 ies , err := ec .adapter .GetInterestedEvents ()
232237 if err != nil {
@@ -258,5 +263,18 @@ func (ec *eventsClient) Stop() error {
258263 // in case the stream/chat server has not been established earlier, we assume that it's closed, successfully
259264 return nil
260265 }
261- return ec .stream .CloseSend ()
266+ //this closes only sending direction of the stream; event is still there
267+ //read will not return an error
268+ err := ec .stream .CloseSend ()
269+ if err != nil {
270+ return err
271+ }
272+ //close client connection
273+ if ec .clientConn != nil {
274+ err := ec .clientConn .Close ()
275+ if err != nil {
276+ return err
277+ }
278+ }
279+ return nil
262280}
0 commit comments