5959
6060from redis .asyncio .observability .recorder import (
6161 record_connection_closed ,
62+ record_connection_count ,
6263 record_connection_create_time ,
6364 record_connection_wait_time ,
6465 record_error_count ,
@@ -1332,9 +1333,67 @@ def __repr__(self):
13321333 )
13331334
13341335 def reset (self ):
1336+ # Record metrics for connections being removed before clearing
1337+ # (only if attributes exist - they won't during __init__)
1338+ if hasattr (self , "_available_connections" ) and hasattr (
1339+ self , "_in_use_connections"
1340+ ):
1341+ idle_count = len (self ._available_connections )
1342+ in_use_count = len (self ._in_use_connections )
1343+ if idle_count > 0 or in_use_count > 0 :
1344+ pool_name = get_pool_name (self )
1345+ # Note: Using sync version since reset() is sync
1346+ from redis .observability .recorder import (
1347+ record_connection_count as sync_record_connection_count ,
1348+ )
1349+
1350+ if idle_count > 0 :
1351+ sync_record_connection_count (
1352+ pool_name = pool_name ,
1353+ connection_state = ConnectionState .IDLE ,
1354+ counter = - idle_count ,
1355+ )
1356+ if in_use_count > 0 :
1357+ sync_record_connection_count (
1358+ pool_name = pool_name ,
1359+ connection_state = ConnectionState .USED ,
1360+ counter = - in_use_count ,
1361+ )
1362+
13351363 self ._available_connections = []
13361364 self ._in_use_connections = weakref .WeakSet ()
13371365
1366+ def __del__ (self ) -> None :
1367+ """Clean up connection pool and record metrics when garbage collected."""
1368+ try :
1369+ if not hasattr (self , "_available_connections" ) or not hasattr (
1370+ self , "_in_use_connections"
1371+ ):
1372+ return
1373+ idle_count = len (self ._available_connections )
1374+ in_use_count = len (self ._in_use_connections )
1375+ if idle_count > 0 or in_use_count > 0 :
1376+ pool_name = get_pool_name (self )
1377+ # Note: Using sync version since __del__ is sync
1378+ from redis .observability .recorder import (
1379+ record_connection_count as sync_record_connection_count ,
1380+ )
1381+
1382+ if idle_count > 0 :
1383+ sync_record_connection_count (
1384+ pool_name = pool_name ,
1385+ connection_state = ConnectionState .IDLE ,
1386+ counter = - idle_count ,
1387+ )
1388+ if in_use_count > 0 :
1389+ sync_record_connection_count (
1390+ pool_name = pool_name ,
1391+ connection_state = ConnectionState .USED ,
1392+ counter = - in_use_count ,
1393+ )
1394+ except Exception :
1395+ pass
1396+
13381397 def can_get_connection (self ) -> bool :
13391398 """Return True if a connection can be retrieved from the pool."""
13401399 return (
@@ -1361,6 +1420,29 @@ async def get_connection(self, command_name=None, *keys, **options):
13611420 )
13621421 is_created = connections_after > connections_before
13631422
1423+ # Record state transition for observability
1424+ # This ensures counters stay balanced if ensure_connection() fails and release() is called
1425+ pool_name = get_pool_name (self )
1426+ if is_created :
1427+ # New connection created and acquired: just USED +1
1428+ await record_connection_count (
1429+ pool_name = pool_name ,
1430+ connection_state = ConnectionState .USED ,
1431+ counter = 1 ,
1432+ )
1433+ else :
1434+ # Existing connection acquired from pool: IDLE -> USED
1435+ await record_connection_count (
1436+ pool_name = pool_name ,
1437+ connection_state = ConnectionState .IDLE ,
1438+ counter = - 1 ,
1439+ )
1440+ await record_connection_count (
1441+ pool_name = pool_name ,
1442+ connection_state = ConnectionState .USED ,
1443+ counter = 1 ,
1444+ )
1445+
13641446 # We now perform the connection check outside of the lock.
13651447 try :
13661448 await self .ensure_connection (connection )
@@ -1398,6 +1480,8 @@ def get_encoder(self):
13981480
13991481 def make_connection (self ):
14001482 """Create a new connection. Can be overridden by child classes."""
1483+ # Note: We don't record IDLE here because async uses a sync make_connection
1484+ # but async record_connection_count. The recording is handled in get_connection.
14011485 return self .connection_class (** self .connection_kwargs )
14021486
14031487 async def ensure_connection (self , connection : AbstractConnection ):
@@ -1421,6 +1505,7 @@ async def release(self, connection: AbstractConnection):
14211505 # Connections should always be returned to the correct pool,
14221506 # not doing so is an error that will cause an exception here.
14231507 self ._in_use_connections .remove (connection )
1508+
14241509 if connection .should_reconnect ():
14251510 await connection .disconnect ()
14261511
@@ -1429,6 +1514,19 @@ async def release(self, connection: AbstractConnection):
14291514 AsyncAfterConnectionReleasedEvent (connection )
14301515 )
14311516
1517+ # Record state transition: USED -> IDLE
1518+ pool_name = get_pool_name (self )
1519+ await record_connection_count (
1520+ pool_name = pool_name ,
1521+ connection_state = ConnectionState .USED ,
1522+ counter = - 1 ,
1523+ )
1524+ await record_connection_count (
1525+ pool_name = pool_name ,
1526+ connection_state = ConnectionState .IDLE ,
1527+ counter = 1 ,
1528+ )
1529+
14321530 async def disconnect (self , inuse_connections : bool = True ):
14331531 """
14341532 Disconnects connections in the pool
@@ -1447,6 +1545,7 @@ async def disconnect(self, inuse_connections: bool = True):
14471545 * (connection .disconnect () for connection in connections ),
14481546 return_exceptions = True ,
14491547 )
1548+
14501549 exc = next ((r for r in resp if isinstance (r , BaseException )), None )
14511550 if exc :
14521551 raise exc
0 commit comments