Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 4b576fd

Browse files
committed
[FAB-8578] Connection Caching
This change introduces the CachingConnector. This component provides the ability to cache GRPC connections. It provides a GRPC compatible Context Dialer interface via the "DialContext" method. Change-Id: Idd2f61e52a3e078cf81042853808c59c9a37b47b Signed-off-by: Troy Ronda <troy@troyronda.com>
1 parent abe9044 commit 4b576fd

File tree

15 files changed

+717
-43
lines changed

15 files changed

+717
-43
lines changed

pkg/context/api/core/provider.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ const (
7676
OrdererResponse
7777
// DiscoveryGreylistExpiry discovery Greylist expiration period
7878
DiscoveryGreylistExpiry
79+
// ConnectionIdle is the timeout for closing idle connections
80+
ConnectionIdle
81+
// CacheSweepInterval is the duration between cache sweeps
82+
CacheSweepInterval
7983
)
8084

8185
// Providers represents the SDK configured core providers context.

pkg/context/api/fab/provider.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type InfraProvider interface {
1919
CreateEventHub(ic IdentityContext, name string) (EventHub, error)
2020
CreatePeerFromConfig(peerCfg *core.NetworkPeer) (Peer, error)
2121
CreateOrdererFromConfig(cfg *core.OrdererConfig) (Orderer, error)
22+
Close()
2223
}
2324

2425
// SelectionProvider is used to select peers for endorsement

pkg/core/config/config.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ import (
3434
var logger = logging.NewLogger(logModule)
3535

3636
const (
37-
cmdRoot = "FABRIC_SDK"
38-
logModule = "fabric_sdk_go"
39-
defaultTimeout = time.Second * 5
37+
cmdRoot = "FABRIC_SDK"
38+
logModule = "fabric_sdk_go"
39+
defaultTimeout = time.Second * 5
40+
defaultConnIdleTimeout = time.Second * 30
41+
defaultCacheSweepInterval = time.Second * 15
4042
)
4143

4244
// Config represents the configuration for the client
@@ -458,7 +460,16 @@ func (c *Config) TimeoutOrDefault(conn core.TimeoutType) time.Duration {
458460
timeout = c.configViper.GetDuration("client.orderer.timeout.connection")
459461
case core.OrdererResponse:
460462
timeout = c.configViper.GetDuration("client.orderer.timeout.response")
461-
463+
case core.CacheSweepInterval: // EXPERIMENTAL - do we need this to be configurable?
464+
timeout = c.configViper.GetDuration("client.cache.interval.sweep")
465+
if timeout == 0 {
466+
timeout = defaultCacheSweepInterval
467+
}
468+
case core.ConnectionIdle:
469+
timeout = c.configViper.GetDuration("client.cache.timeout.connectionIdle")
470+
if timeout == 0 {
471+
timeout = defaultConnIdleTimeout
472+
}
462473
}
463474
if timeout == 0 {
464475
timeout = defaultTimeout

pkg/fab/comm/comm_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright SecureKey Technologies Inc. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package comm
8+
9+
import (
10+
"fmt"
11+
"net"
12+
"os"
13+
"testing"
14+
"time"
15+
16+
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
17+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
18+
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
19+
"github.com/pkg/errors"
20+
"google.golang.org/grpc"
21+
)
22+
23+
const (
24+
peerAddress = "localhost:9999"
25+
endorserAddress = "127.0.0.1:0"
26+
peerURL = "grpc://" + peerAddress
27+
)
28+
29+
func TestMain(m *testing.M) {
30+
var opts []grpc.ServerOption
31+
grpcServer := grpc.NewServer(opts...)
32+
33+
lis, err := net.Listen("tcp", peerAddress)
34+
if err != nil {
35+
panic(fmt.Sprintf("Error starting events listener %s", err))
36+
}
37+
38+
testServer = eventmocks.NewMockEventhubServer()
39+
40+
pb.RegisterEventsServer(grpcServer, testServer)
41+
42+
go grpcServer.Serve(lis)
43+
44+
srvs, addrs, err := startEndorsers(2, endorserAddress)
45+
if err != nil {
46+
panic(fmt.Sprintf("Error starting endorser %s", err))
47+
}
48+
for _, srv := range srvs {
49+
defer srv.Stop()
50+
}
51+
endorserAddr = addrs
52+
53+
time.Sleep(2 * time.Second)
54+
os.Exit(m.Run())
55+
}
56+
57+
func startEndorsers(count int, address string) ([]*grpc.Server, []string, error) {
58+
srvs := make([]*grpc.Server, 0, count)
59+
addrs := make([]string, 0, count)
60+
61+
for i := 0; i < count; i++ {
62+
srv := grpc.NewServer()
63+
_, addr, ok := startEndorserServer(srv, address)
64+
if !ok {
65+
return nil, nil, errors.New("unable to start GRPC server")
66+
}
67+
srvs = append(srvs, srv)
68+
addrs = append(addrs, addr)
69+
}
70+
return srvs, addrs, nil
71+
}
72+
73+
func startEndorserServer(grpcServer *grpc.Server, address string) (*mocks.MockEndorserServer, string, bool) {
74+
lis, err := net.Listen("tcp", address)
75+
if err != nil {
76+
fmt.Printf("Error starting test server %s", err)
77+
return nil, "", false
78+
}
79+
addr := lis.Addr().String()
80+
81+
endorserServer := &mocks.MockEndorserServer{}
82+
pb.RegisterEndorserServer(grpcServer, endorserServer)
83+
fmt.Printf("Starting test server on %s", addr)
84+
go grpcServer.Serve(lis)
85+
return endorserServer, addr, true
86+
}

pkg/fab/comm/connection_test.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ package comm
88

99
import (
1010
"context"
11-
"fmt"
12-
"net"
13-
"os"
1411
"testing"
1512
"time"
1613

@@ -26,11 +23,6 @@ import (
2623
"google.golang.org/grpc"
2724
)
2825

29-
const (
30-
peerAddress = "localhost:9999"
31-
peerURL = "grpc://" + peerAddress
32-
)
33-
3426
var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
3527
return pb.NewDeliverClient(grpcconn).Deliver(context.Background())
3628
}
@@ -93,25 +85,7 @@ func TestConnection(t *testing.T) {
9385

9486
// Use the Deliver server for testing
9587
var testServer *eventmocks.MockEventhubServer
96-
97-
func TestMain(m *testing.M) {
98-
var opts []grpc.ServerOption
99-
grpcServer := grpc.NewServer(opts...)
100-
101-
lis, err := net.Listen("tcp", peerAddress)
102-
if err != nil {
103-
panic(fmt.Sprintf("Error starting events listener %s", err))
104-
}
105-
106-
testServer = eventmocks.NewMockEventhubServer()
107-
108-
pb.RegisterEventsServer(grpcServer, testServer)
109-
110-
go grpcServer.Serve(lis)
111-
112-
time.Sleep(2 * time.Second)
113-
os.Exit(m.Run())
114-
}
88+
var endorserAddr []string
11589

11690
func newPeerConfig(peerURL string) *core.PeerConfig {
11791
return &core.PeerConfig{

0 commit comments

Comments
 (0)