@@ -30,13 +30,16 @@ const (
3030//
3131// This component has been designed to be safe for concurrency.
3232type CachingConnector struct {
33- conns sync.Map
34- sweepTime time.Duration
35- idleTime time.Duration
36- index map [* grpc.ClientConn ]* cachedConn
37- lock sync.Mutex
33+ conns map [string ]* cachedConn
34+ sweepTime time.Duration
35+ idleTime time.Duration
36+ index map [* grpc.ClientConn ]* cachedConn
37+ // lock protects concurrent access to the connection cache
38+ // it is held during create, load, release, and sweep connection
39+ // operations. Note: it is released during openConn, which is
40+ // the blocking part of the connection process.
41+ lock sync.RWMutex
3842 waitgroup sync.WaitGroup
39- janitorChan chan * cachedConn
4043 janitorDone chan bool
4144 janitorClosed chan bool
4245}
@@ -45,18 +48,16 @@ type cachedConn struct {
4548 target string
4649 conn * grpc.ClientConn
4750 open int
48- lastOpen time.Time
4951 lastClose time.Time
5052}
5153
5254// NewCachingConnector creates a GRPC connection cache. The cache is governed by
5355// sweepTime and idleTime.
5456func NewCachingConnector (sweepTime time.Duration , idleTime time.Duration ) * CachingConnector {
5557 cc := CachingConnector {
56- conns : sync. Map {},
58+ conns : map [ string ] * cachedConn {},
5759 index : map [* grpc.ClientConn ]* cachedConn {},
58- janitorChan : make (chan * cachedConn ),
59- janitorDone : make (chan bool ),
60+ janitorDone : make (chan bool , 1 ),
6061 janitorClosed : make (chan bool , 1 ),
6162 sweepTime : sweepTime ,
6263 idleTime : idleTime ,
@@ -73,15 +74,15 @@ func NewCachingConnector(sweepTime time.Duration, idleTime time.Duration) *Cachi
7374
7475// Close cleans up cached connections.
7576func (cc * CachingConnector ) Close () {
76- cc .lock .Lock ()
77- defer cc .lock .Unlock ()
78-
77+ cc .lock .RLock ()
7978 // Safety check to see if the connector has been closed. This represents a
8079 // bug in the calling code, but it's not good to panic here.
8180 if cc .janitorDone == nil {
81+ cc .lock .RUnlock ()
8282 logger .Warn ("Trying to close connector after already closed" )
8383 return
8484 }
85+ cc .lock .RUnlock ()
8586 logger .Debug ("closing caching GRPC connector" )
8687
8788 select {
@@ -93,7 +94,16 @@ func (cc *CachingConnector) Close() {
9394 cc .waitgroup .Wait ()
9495 }
9596
96- close (cc .janitorChan )
97+ cc .lock .Lock ()
98+ defer cc .lock .Unlock ()
99+
100+ if len (cc .index ) > 0 {
101+ logger .Debugf ("flushing connection cache with open connections [%d]" , len (cc .index ))
102+ } else {
103+ logger .Debugf ("flushing connection cache" )
104+ }
105+
106+ cc .flush ()
97107 close (cc .janitorClosed )
98108 close (cc .janitorDone )
99109 cc .janitorDone = nil
@@ -103,16 +113,23 @@ func (cc *CachingConnector) Close() {
103113func (cc * CachingConnector ) DialContext (ctx context.Context , target string , opts ... grpc.DialOption ) (* grpc.ClientConn , error ) {
104114 logger .Debugf ("DialContext: %s" , target )
105115
116+ cc .lock .Lock ()
106117 c , ok := cc .loadConn (target )
107118 if ! ok {
108119 createdConn , err := cc .createConn (ctx , target , opts ... )
109120 if err != nil {
121+ cc .lock .Unlock ()
110122 return nil , errors .WithMessage (err , "connection creation failed" )
111123 }
112124 c = createdConn
113125 }
114126
127+ cc .lock .Unlock ()
128+
115129 if err := cc .openConn (ctx , c ); err != nil {
130+ cc .lock .Lock ()
131+ setClosed (c )
132+ cc .lock .Unlock ()
116133 return nil , errors .Errorf ("dialing connection timed out [%s]" , target )
117134 }
118135 return c .conn , nil
@@ -144,33 +161,27 @@ func (cc *CachingConnector) ReleaseConn(conn *grpc.ClientConn) {
144161 }
145162 logger .Debugf ("ReleaseConn [%s]" , cconn .target )
146163
147- if cconn .open > 0 {
148- cconn .lastClose = time .Now ()
149- cconn .open --
150- }
164+ setClosed (cconn )
151165
152- cc .updateJanitor ( cconn )
166+ cc .ensureJanitorStarted ( )
153167}
154168
155169func (cc * CachingConnector ) loadConn (target string ) (* cachedConn , bool ) {
156- connRaw , ok := cc .conns . Load ( target )
170+ c , ok := cc .conns [ target ]
157171 if ok {
158- c , ok := connRaw .(* cachedConn )
159- if ok {
160- if c .conn .GetState () != connectivity .Shutdown {
161- logger .Debugf ("using cached connection [%s: %p]" , target , c )
162- return c , true
163- }
164- cc .shutdownConn (c )
172+ if c .conn .GetState () != connectivity .Shutdown {
173+ logger .Debugf ("using cached connection [%s: %p]" , target , c )
174+ // Set connection open as soon as it is loaded to prevent the janitor
175+ // from sweeping it
176+ c .open ++
177+ return c , true
165178 }
179+ cc .shutdownConn (c )
166180 }
167181 return nil , false
168182}
169183
170184func (cc * CachingConnector ) createConn (ctx context.Context , target string , opts ... grpc.DialOption ) (* cachedConn , error ) {
171- cc .lock .Lock ()
172- defer cc .lock .Unlock ()
173-
174185 if cc .janitorDone == nil {
175186 return nil , errors .New ("caching connector is closed" )
176187 }
@@ -190,8 +201,10 @@ func (cc *CachingConnector) createConn(ctx context.Context, target string, opts
190201 cconn = & cachedConn {
191202 target : target ,
192203 conn : conn ,
204+ open : 1 ,
193205 }
194- cc .conns .Store (target , cconn )
206+
207+ cc .conns [target ] = cconn
195208 cc .index [conn ] = cconn
196209
197210 return cconn , nil
@@ -204,11 +217,7 @@ func (cc *CachingConnector) openConn(ctx context.Context, c *cachedConn) error {
204217 return err
205218 }
206219
207- cc .lock .Lock ()
208- defer cc .lock .Unlock ()
209- c .open ++
210- c .lastOpen = time .Now ()
211- cc .updateJanitor (c )
220+ cc .ensureJanitorStarted ()
212221
213222 logger .Debugf ("connection was opened [%s]" , c .target )
214223 return nil
@@ -228,157 +237,94 @@ func waitConn(ctx context.Context, conn *grpc.ClientConn, targetState connectivi
228237}
229238
230239func (cc * CachingConnector ) shutdownConn (cconn * cachedConn ) {
231- cc .lock .Lock ()
232- defer cc .lock .Unlock ()
233-
234240 if cc .janitorDone == nil {
235241 logger .Debug ("Connector already closed" )
236242 return
237243 }
238244
239245 logger .Debugf ("connection was shutdown [%s]" , cconn .target )
240- cc .conns . Delete ( cconn .target )
246+ delete ( cc .conns , cconn .target )
241247 delete (cc .index , cconn .conn )
242248
243- cconn .open = 0
244- cconn .lastClose = time.Time {}
245-
246- cc .updateJanitor (cconn )
249+ cc .ensureJanitorStarted ()
247250}
248251
249- func (cc * CachingConnector ) removeConn (target string ) {
250- cc .lock .Lock ()
251- defer cc .lock .Unlock ()
252-
253- logger .Debugf ("removing connection [%s]" , target )
254- connRaw , ok := cc .conns .Load (target )
255- if ok {
256- c , ok := connRaw .(* cachedConn )
257- if ok {
258- delete (cc .index , c .conn )
259- cc .conns .Delete (target )
260- if err := c .conn .Close (); err != nil {
261- logger .Debugf ("unable to close connection [%s]" , err )
262- }
252+ func (cc * CachingConnector ) sweepAndRemove () {
253+ now := time .Now ()
254+ for conn , cachedConn := range cc .index {
255+ if cachedConn .open == 0 && now .After (cachedConn .lastClose .Add (cc .idleTime )) {
256+ logger .Debugf ("connection janitor closing connection [%s]" , cachedConn .target )
257+ cc .removeConn (cachedConn )
258+ } else if conn .GetState () == connectivity .Shutdown {
259+ logger .Debugf ("connection already closed [%s]" , cachedConn .target )
260+ cc .removeConn (cachedConn )
263261 }
264262 }
265263}
266264
267- func (cc * CachingConnector ) updateJanitor (c * cachedConn ) {
265+ func (cc * CachingConnector ) removeConn (c * cachedConn ) {
266+ logger .Debugf ("removing connection [%s]" , c .target )
267+ delete (cc .index , c .conn )
268+ delete (cc .conns , c .target )
269+ if err := c .conn .Close (); err != nil {
270+ logger .Debugf ("unable to close connection [%s]" , err )
271+ }
272+ }
273+
274+ func (cc * CachingConnector ) ensureJanitorStarted () {
268275 select {
269276 case <- cc .janitorClosed :
270277 logger .Debugf ("janitor not started" )
271278 cc .waitgroup .Add (1 )
272- go janitor ( cc .sweepTime , cc . idleTime , & cc . waitgroup , cc . janitorChan , cc . janitorClosed , cc . janitorDone , cc . removeConn )
279+ go cc .janitor ( )
273280 default :
274281 logger .Debugf ("janitor already started" )
275282 }
276- cClone := * c
277-
278- cc .janitorChan <- & cClone
279283}
280284
281- // The janitor monitors open connections for shutdown state or extended non-usage.
285+ // janitor monitors open connections for shutdown state or extended non-usage.
282286// This component operates by running a sweep with a period determined by "sweepTime".
283287// When a connection returned the GRPC status connectivity.Shutdown or when the connection
284288// has its usages closed for longer than "idleTime", the connection is closed and the
285289// "connRemove" notifier is called.
286290//
287291// The caching connector:
288- // pushes connection information via the "conn" go channel.
289292// notifies the janitor of close by closing the "done" go channel.
290293//
291294// The janitor:
292295// calls "connRemove" callback when closing a connection.
293296// decrements the "wg" waitgroup when exiting.
294297// writes to the "done" go channel when closing due to becoming empty.
295-
296- type connRemoveNotifier func (target string )
297-
298- func janitor (sweepTime time.Duration , idleTime time.Duration , wg * sync.WaitGroup , conn chan * cachedConn , close chan bool , done chan bool , connRemove connRemoveNotifier ) {
298+ func (cc * CachingConnector ) janitor () {
299299 logger .Debugf ("starting connection janitor" )
300- defer wg .Done ()
300+ defer cc . waitgroup .Done ()
301301
302- conns := map [string ]* cachedConn {}
303- ticker := time .NewTicker (sweepTime )
302+ ticker := time .NewTicker (cc .sweepTime )
304303 for {
305304 select {
306- case <- done :
307- if len (conns ) > 0 {
308- logger .Debugf ("flushing connection janitor with open connections [%d]" , len (conns ))
309- } else {
310- logger .Debugf ("flushing connection janitor" )
311- }
312- flush (conns )
305+ case <- cc .janitorDone :
313306 return
314- case c := <- conn :
315- cache (conns , c )
316307 case <- ticker .C :
317- rm := sweep (conns , idleTime )
318- for _ , target := range rm {
319- connRemove (target )
320- delete (conns , target )
321- }
322-
323- if len (conns ) == 0 {
308+ cc .lock .Lock ()
309+ cc .sweepAndRemove ()
310+ numConn := len (cc .index )
311+ cc .lock .Unlock ()
312+ if numConn == 0 {
324313 logger .Debugf ("closing connection janitor" )
325- close <- true
314+ cc . janitorClosed <- true
326315 return
327316 }
328317 }
329318 }
330319}
331320
332- func cache (conns map [string ]* cachedConn , updateConn * cachedConn ) {
333-
334- c , ok := conns [updateConn .target ]
335- if ok && updateConn .lastClose .IsZero () && updateConn .conn .GetState () == connectivity .Shutdown {
336- logger .Debugf ("connection shutdown detected in connection janitor" )
337- // We need to remove the connection from sweep consideration immediately
338- // since the connector has already removed it. Otherwise we can have a race
339- // between shutdown and creating a connection concurrently.
340- delete (conns , updateConn .target )
341- return
342- }
343-
344- if ! ok {
345- logger .Debugf ("new connection in connection janitor" )
346- } else if c .conn != updateConn .conn {
347- logger .Debugf ("connection change in connection janitor" )
348-
349- if err := c .conn .Close (); err != nil {
350- logger .Debugf ("unable to close connection [%s]" , err )
351- }
352-
353- } else {
354- logger .Debugf ("updating existing connection in connection janitor" )
355- }
356-
357- conns [updateConn .target ] = updateConn
358- }
359-
360- func flush (conns map [string ]* cachedConn ) {
361- for _ , c := range conns {
362- logger .Debugf ("connection janitor closing connection [%s]" , c .target )
321+ func (cc * CachingConnector ) flush () {
322+ for _ , c := range cc .index {
323+ logger .Debugf ("flushing connection [%s]" , c .target )
363324 closeConn (c .conn )
364325 }
365326}
366327
367- func sweep (conns map [string ]* cachedConn , idleTime time.Duration ) []string {
368- rm := make ([]string , 0 , len (conns ))
369- now := time .Now ()
370- for _ , c := range conns {
371- if c .open == 0 && now .After (c .lastClose .Add (idleTime )) {
372- logger .Debugf ("connection janitor closing connection [%s]" , c .target )
373- rm = append (rm , c .target )
374- } else if c .conn .GetState () == connectivity .Shutdown {
375- logger .Debugf ("connection already closed [%s]" , c .target )
376- rm = append (rm , c .target )
377- }
378- }
379- return rm
380- }
381-
382328func closeConn (conn * grpc.ClientConn ) {
383329 if err := conn .Close (); err != nil {
384330 logger .Debugf ("unable to close connection [%s]" , err )
@@ -390,3 +336,10 @@ func closeConn(conn *grpc.ClientConn) {
390336 }
391337 cancel ()
392338}
339+
340+ func setClosed (cconn * cachedConn ) {
341+ if cconn .open > 0 {
342+ cconn .lastClose = time .Now ()
343+ cconn .open --
344+ }
345+ }
0 commit comments