Skip to content

Commit a3bcb11

Browse files
Refactored connection count and SCH metric collection (#4001)
* Refactored connection count and SCH metric collection * Codestyle fixes * Fixed correct deprecation description * Refactored state transitions * Fixed transition for async * Revert for backward-compatibility * Fixed leaking connection states * Refactor metrics export on cleanup * Fixed potential deadlock * Codestyle fixes * Remvoved double decrement * Added synchronisation for correct metric recording --------- Co-authored-by: petyaslavova <petya.slavova@redis.com>
1 parent 7465249 commit a3bcb11

11 files changed

Lines changed: 798 additions & 377 deletions

File tree

redis/asyncio/connection.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
from redis.asyncio.observability.recorder import (
6161
record_connection_closed,
62+
record_connection_count,
6263
record_connection_create_time,
6364
record_connection_wait_time,
6465
record_error_count,
@@ -1332,9 +1333,67 @@ def __repr__(self):
13321333
)
13331334

13341335
def reset(self):
1336+
# Record metrics for connections being removed before clearing
1337+
# (only if attributes exist - they won't during __init__)
1338+
if hasattr(self, "_available_connections") and hasattr(
1339+
self, "_in_use_connections"
1340+
):
1341+
idle_count = len(self._available_connections)
1342+
in_use_count = len(self._in_use_connections)
1343+
if idle_count > 0 or in_use_count > 0:
1344+
pool_name = get_pool_name(self)
1345+
# Note: Using sync version since reset() is sync
1346+
from redis.observability.recorder import (
1347+
record_connection_count as sync_record_connection_count,
1348+
)
1349+
1350+
if idle_count > 0:
1351+
sync_record_connection_count(
1352+
pool_name=pool_name,
1353+
connection_state=ConnectionState.IDLE,
1354+
counter=-idle_count,
1355+
)
1356+
if in_use_count > 0:
1357+
sync_record_connection_count(
1358+
pool_name=pool_name,
1359+
connection_state=ConnectionState.USED,
1360+
counter=-in_use_count,
1361+
)
1362+
13351363
self._available_connections = []
13361364
self._in_use_connections = weakref.WeakSet()
13371365

1366+
def __del__(self) -> None:
1367+
"""Clean up connection pool and record metrics when garbage collected."""
1368+
try:
1369+
if not hasattr(self, "_available_connections") or not hasattr(
1370+
self, "_in_use_connections"
1371+
):
1372+
return
1373+
idle_count = len(self._available_connections)
1374+
in_use_count = len(self._in_use_connections)
1375+
if idle_count > 0 or in_use_count > 0:
1376+
pool_name = get_pool_name(self)
1377+
# Note: Using sync version since __del__ is sync
1378+
from redis.observability.recorder import (
1379+
record_connection_count as sync_record_connection_count,
1380+
)
1381+
1382+
if idle_count > 0:
1383+
sync_record_connection_count(
1384+
pool_name=pool_name,
1385+
connection_state=ConnectionState.IDLE,
1386+
counter=-idle_count,
1387+
)
1388+
if in_use_count > 0:
1389+
sync_record_connection_count(
1390+
pool_name=pool_name,
1391+
connection_state=ConnectionState.USED,
1392+
counter=-in_use_count,
1393+
)
1394+
except Exception:
1395+
pass
1396+
13381397
def can_get_connection(self) -> bool:
13391398
"""Return True if a connection can be retrieved from the pool."""
13401399
return (
@@ -1361,6 +1420,29 @@ async def get_connection(self, command_name=None, *keys, **options):
13611420
)
13621421
is_created = connections_after > connections_before
13631422

1423+
# Record state transition for observability
1424+
# This ensures counters stay balanced if ensure_connection() fails and release() is called
1425+
pool_name = get_pool_name(self)
1426+
if is_created:
1427+
# New connection created and acquired: just USED +1
1428+
await record_connection_count(
1429+
pool_name=pool_name,
1430+
connection_state=ConnectionState.USED,
1431+
counter=1,
1432+
)
1433+
else:
1434+
# Existing connection acquired from pool: IDLE -> USED
1435+
await record_connection_count(
1436+
pool_name=pool_name,
1437+
connection_state=ConnectionState.IDLE,
1438+
counter=-1,
1439+
)
1440+
await record_connection_count(
1441+
pool_name=pool_name,
1442+
connection_state=ConnectionState.USED,
1443+
counter=1,
1444+
)
1445+
13641446
# We now perform the connection check outside of the lock.
13651447
try:
13661448
await self.ensure_connection(connection)
@@ -1398,6 +1480,8 @@ def get_encoder(self):
13981480

13991481
def make_connection(self):
14001482
"""Create a new connection. Can be overridden by child classes."""
1483+
# Note: We don't record IDLE here because async uses a sync make_connection
1484+
# but async record_connection_count. The recording is handled in get_connection.
14011485
return self.connection_class(**self.connection_kwargs)
14021486

14031487
async def ensure_connection(self, connection: AbstractConnection):
@@ -1421,6 +1505,7 @@ async def release(self, connection: AbstractConnection):
14211505
# Connections should always be returned to the correct pool,
14221506
# not doing so is an error that will cause an exception here.
14231507
self._in_use_connections.remove(connection)
1508+
14241509
if connection.should_reconnect():
14251510
await connection.disconnect()
14261511

@@ -1429,6 +1514,19 @@ async def release(self, connection: AbstractConnection):
14291514
AsyncAfterConnectionReleasedEvent(connection)
14301515
)
14311516

1517+
# Record state transition: USED -> IDLE
1518+
pool_name = get_pool_name(self)
1519+
await record_connection_count(
1520+
pool_name=pool_name,
1521+
connection_state=ConnectionState.USED,
1522+
counter=-1,
1523+
)
1524+
await record_connection_count(
1525+
pool_name=pool_name,
1526+
connection_state=ConnectionState.IDLE,
1527+
counter=1,
1528+
)
1529+
14321530
async def disconnect(self, inuse_connections: bool = True):
14331531
"""
14341532
Disconnects connections in the pool
@@ -1447,6 +1545,7 @@ async def disconnect(self, inuse_connections: bool = True):
14471545
*(connection.disconnect() for connection in connections),
14481546
return_exceptions=True,
14491547
)
1548+
14501549
exc = next((r for r in resp if isinstance(r, BaseException)), None)
14511550
if exc:
14521551
raise exc

redis/asyncio/observability/recorder.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@
2424
from typing import TYPE_CHECKING, List, Optional
2525

2626
from redis.observability.attributes import (
27+
ConnectionState,
2728
GeoFailoverReason,
2829
PubSubDirection,
2930
)
3031
from redis.observability.metrics import CloseReason, RedisMetricsCollector
3132
from redis.observability.providers import get_observability_instance
3233
from redis.observability.registry import get_observables_registry_instance
33-
from redis.utils import str_if_bytes
34+
from redis.utils import deprecated_function, str_if_bytes
3435

3536
if TYPE_CHECKING:
3637
from redis.asyncio.connection import ConnectionPool
@@ -168,6 +169,38 @@ async def record_connection_create_time(
168169
pass
169170

170171

172+
async def record_connection_count(
173+
pool_name: str,
174+
connection_state: ConnectionState,
175+
counter: int = 1,
176+
) -> None:
177+
"""
178+
Record a connection count change for a single state.
179+
180+
Args:
181+
pool_name: Connection pool identifier
182+
connection_state: State to update (IDLE or USED)
183+
counter: Number to add (positive) or subtract (negative)
184+
"""
185+
collector = _get_or_create_collector()
186+
if collector is None:
187+
return
188+
189+
try:
190+
collector.record_connection_count(
191+
pool_name=pool_name,
192+
connection_state=connection_state,
193+
counter=counter,
194+
)
195+
except Exception:
196+
pass
197+
198+
199+
@deprecated_function(
200+
reason="Connection count is now tracked via record_connection_count(). "
201+
"This functionality will be removed in the next major version",
202+
version="7.4.0",
203+
)
171204
async def init_connection_count() -> None:
172205
"""
173206
Initialize observable gauge for connection count metric.
@@ -194,6 +227,11 @@ def observable_callback(__):
194227
pass
195228

196229

230+
@deprecated_function(
231+
reason="Connection count is now tracked via record_connection_count(). "
232+
"This functionality will be removed in the next major version",
233+
version="7.4.0",
234+
)
197235
async def register_pools_connection_count(
198236
connection_pools: List["ConnectionPool"],
199237
) -> None:
@@ -301,7 +339,7 @@ async def record_connection_relaxed_timeout(
301339
Record a connection timeout relaxation event.
302340
303341
Args:
304-
connection_name: Connection identifier
342+
connection_name: Connection identifier (pool name)
305343
maint_notification: Maintenance notification type
306344
relaxed: True to count up (relaxed), False to count down (unrelaxed)
307345
"""

0 commit comments

Comments
 (0)