Skip to content

Commit ea521c7

Browse files
committed
Merge branch 'master' of github.com:redis/redis-py into vv-multidb-healthcheck-refactor
2 parents 0f7d7c3 + abd08b0 commit ea521c7

34 files changed

Lines changed: 8486 additions & 1103 deletions

.agent/sync_async_type_hints_overload_guide.md

Lines changed: 812 additions & 0 deletions
Large diffs are not rendered by default.

.github/workflows/integration.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ jobs:
4545
ignore-vulns: |
4646
GHSA-w596-4wvx-j9j6 # subversion related git pull, dependency for pytest. There is no impact here.
4747
CVE-2026-26007 # dependency for entraid tests
48+
CVE-2026-32597 # PyJWT does not validate the crit (Critical) Header Parameter defined in RFC 7515, this will be fixed in the next release
4849
4950
lint:
5051
name: Code linters

redis/asyncio/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
Dict,
1414
Iterable,
1515
List,
16+
Literal,
1617
Mapping,
1718
MutableMapping,
1819
Optional,
@@ -129,6 +130,9 @@ class Redis(
129130
Connection object to talk to redis.
130131
"""
131132

133+
# Type discrimination marker for @overload self-type pattern
134+
_is_async_client: Literal[True] = True
135+
132136
response_callbacks: MutableMapping[Union[str, bytes], ResponseCallbackT]
133137

134138
@classmethod

redis/asyncio/cluster.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Dict,
1717
Generator,
1818
List,
19+
Literal,
1920
Mapping,
2021
Optional,
2122
Set,
@@ -238,6 +239,9 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
238239
kwargs["ssl"] = True
239240
return cls(**kwargs)
240241

242+
# Type discrimination marker for @overload self-type pattern
243+
_is_async_client: Literal[True] = True
244+
241245
__slots__ = (
242246
"_initialize",
243247
"_lock",
@@ -1976,7 +1980,14 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
19761980
| Existing :class:`~.RedisCluster` client
19771981
"""
19781982

1979-
__slots__ = ("cluster_client", "_transaction", "_execution_strategy")
1983+
__slots__ = (
1984+
"cluster_client",
1985+
"_transaction",
1986+
"_execution_strategy",
1987+
)
1988+
1989+
# Type discrimination marker for @overload self-type pattern
1990+
_is_async_client: Literal[True] = True
19801991

19811992
def __init__(
19821993
self, client: RedisCluster, transaction: Optional[bool] = None

redis/asyncio/connection.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
from redis.asyncio.observability.recorder import (
6161
record_connection_closed,
62+
record_connection_count,
6263
record_connection_create_time,
6364
record_connection_wait_time,
6465
record_error_count,
@@ -1308,18 +1309,91 @@ def __init__(
13081309
if self._event_dispatcher is None:
13091310
self._event_dispatcher = EventDispatcher()
13101311

1312+
# Keys that should be redacted in __repr__ to avoid exposing sensitive information
1313+
SENSITIVE_REPR_KEYS = frozenset(
1314+
{
1315+
"password",
1316+
"username",
1317+
"ssl_password",
1318+
"credential_provider",
1319+
}
1320+
)
1321+
13111322
def __repr__(self):
1312-
conn_kwargs = ",".join([f"{k}={v}" for k, v in self.connection_kwargs.items()])
1323+
conn_kwargs = ",".join(
1324+
[
1325+
f"{k}={'<REDACTED>' if k in self.SENSITIVE_REPR_KEYS else v}"
1326+
for k, v in self.connection_kwargs.items()
1327+
]
1328+
)
13131329
return (
13141330
f"<{self.__class__.__module__}.{self.__class__.__name__}"
13151331
f"(<{self.connection_class.__module__}.{self.connection_class.__name__}"
13161332
f"({conn_kwargs})>)>"
13171333
)
13181334

13191335
def reset(self):
1336+
# Record metrics for connections being removed before clearing
1337+
# (only if attributes exist - they won't during __init__)
1338+
if hasattr(self, "_available_connections") and hasattr(
1339+
self, "_in_use_connections"
1340+
):
1341+
idle_count = len(self._available_connections)
1342+
in_use_count = len(self._in_use_connections)
1343+
if idle_count > 0 or in_use_count > 0:
1344+
pool_name = get_pool_name(self)
1345+
# Note: Using sync version since reset() is sync
1346+
from redis.observability.recorder import (
1347+
record_connection_count as sync_record_connection_count,
1348+
)
1349+
1350+
if idle_count > 0:
1351+
sync_record_connection_count(
1352+
pool_name=pool_name,
1353+
connection_state=ConnectionState.IDLE,
1354+
counter=-idle_count,
1355+
)
1356+
if in_use_count > 0:
1357+
sync_record_connection_count(
1358+
pool_name=pool_name,
1359+
connection_state=ConnectionState.USED,
1360+
counter=-in_use_count,
1361+
)
1362+
13201363
self._available_connections = []
13211364
self._in_use_connections = weakref.WeakSet()
13221365

1366+
def __del__(self) -> None:
1367+
"""Clean up connection pool and record metrics when garbage collected."""
1368+
try:
1369+
if not hasattr(self, "_available_connections") or not hasattr(
1370+
self, "_in_use_connections"
1371+
):
1372+
return
1373+
idle_count = len(self._available_connections)
1374+
in_use_count = len(self._in_use_connections)
1375+
if idle_count > 0 or in_use_count > 0:
1376+
pool_name = get_pool_name(self)
1377+
# Note: Using sync version since __del__ is sync
1378+
from redis.observability.recorder import (
1379+
record_connection_count as sync_record_connection_count,
1380+
)
1381+
1382+
if idle_count > 0:
1383+
sync_record_connection_count(
1384+
pool_name=pool_name,
1385+
connection_state=ConnectionState.IDLE,
1386+
counter=-idle_count,
1387+
)
1388+
if in_use_count > 0:
1389+
sync_record_connection_count(
1390+
pool_name=pool_name,
1391+
connection_state=ConnectionState.USED,
1392+
counter=-in_use_count,
1393+
)
1394+
except Exception:
1395+
pass
1396+
13231397
def can_get_connection(self) -> bool:
13241398
"""Return True if a connection can be retrieved from the pool."""
13251399
return (
@@ -1346,6 +1420,29 @@ async def get_connection(self, command_name=None, *keys, **options):
13461420
)
13471421
is_created = connections_after > connections_before
13481422

1423+
# Record state transition for observability
1424+
# This ensures counters stay balanced if ensure_connection() fails and release() is called
1425+
pool_name = get_pool_name(self)
1426+
if is_created:
1427+
# New connection created and acquired: just USED +1
1428+
await record_connection_count(
1429+
pool_name=pool_name,
1430+
connection_state=ConnectionState.USED,
1431+
counter=1,
1432+
)
1433+
else:
1434+
# Existing connection acquired from pool: IDLE -> USED
1435+
await record_connection_count(
1436+
pool_name=pool_name,
1437+
connection_state=ConnectionState.IDLE,
1438+
counter=-1,
1439+
)
1440+
await record_connection_count(
1441+
pool_name=pool_name,
1442+
connection_state=ConnectionState.USED,
1443+
counter=1,
1444+
)
1445+
13491446
# We now perform the connection check outside of the lock.
13501447
try:
13511448
await self.ensure_connection(connection)
@@ -1383,6 +1480,8 @@ def get_encoder(self):
13831480

13841481
def make_connection(self):
13851482
"""Create a new connection. Can be overridden by child classes."""
1483+
# Note: We don't record IDLE here because async uses a sync make_connection
1484+
# but async record_connection_count. The recording is handled in get_connection.
13861485
return self.connection_class(**self.connection_kwargs)
13871486

13881487
async def ensure_connection(self, connection: AbstractConnection):
@@ -1406,6 +1505,7 @@ async def release(self, connection: AbstractConnection):
14061505
# Connections should always be returned to the correct pool,
14071506
# not doing so is an error that will cause an exception here.
14081507
self._in_use_connections.remove(connection)
1508+
14091509
if connection.should_reconnect():
14101510
await connection.disconnect()
14111511

@@ -1414,6 +1514,19 @@ async def release(self, connection: AbstractConnection):
14141514
AsyncAfterConnectionReleasedEvent(connection)
14151515
)
14161516

1517+
# Record state transition: USED -> IDLE
1518+
pool_name = get_pool_name(self)
1519+
await record_connection_count(
1520+
pool_name=pool_name,
1521+
connection_state=ConnectionState.USED,
1522+
counter=-1,
1523+
)
1524+
await record_connection_count(
1525+
pool_name=pool_name,
1526+
connection_state=ConnectionState.IDLE,
1527+
counter=1,
1528+
)
1529+
14171530
async def disconnect(self, inuse_connections: bool = True):
14181531
"""
14191532
Disconnects connections in the pool
@@ -1432,6 +1545,7 @@ async def disconnect(self, inuse_connections: bool = True):
14321545
*(connection.disconnect() for connection in connections),
14331546
return_exceptions=True,
14341547
)
1548+
14351549
exc = next((r for r in resp if isinstance(r, BaseException)), None)
14361550
if exc:
14371551
raise exc

redis/asyncio/multidb/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import logging
3-
from typing import Any, Awaitable, Callable, List, Optional, Union
3+
from typing import Any, Awaitable, Callable, List, Literal, Optional, Union
44

55
from redis.asyncio.client import PubSubHandler
66
from redis.asyncio.multidb.command_executor import DefaultCommandExecutor
@@ -437,6 +437,8 @@ class Pipeline(AsyncRedisModuleCommands, AsyncCoreCommands):
437437
Pipeline implementation for multiple logical Redis databases.
438438
"""
439439

440+
_is_async_client: Literal[True] = True
441+
440442
def __init__(self, client: MultiDBClient):
441443
self._command_stack = []
442444
self._client = client

redis/asyncio/observability/recorder.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@
2424
from typing import TYPE_CHECKING, List, Optional
2525

2626
from redis.observability.attributes import (
27+
ConnectionState,
2728
GeoFailoverReason,
2829
PubSubDirection,
2930
)
3031
from redis.observability.metrics import CloseReason, RedisMetricsCollector
3132
from redis.observability.providers import get_observability_instance
3233
from redis.observability.registry import get_observables_registry_instance
33-
from redis.utils import str_if_bytes
34+
from redis.utils import deprecated_function, str_if_bytes
3435

3536
if TYPE_CHECKING:
3637
from redis.asyncio.connection import ConnectionPool
@@ -168,6 +169,38 @@ async def record_connection_create_time(
168169
pass
169170

170171

172+
async def record_connection_count(
173+
pool_name: str,
174+
connection_state: ConnectionState,
175+
counter: int = 1,
176+
) -> None:
177+
"""
178+
Record a connection count change for a single state.
179+
180+
Args:
181+
pool_name: Connection pool identifier
182+
connection_state: State to update (IDLE or USED)
183+
counter: Number to add (positive) or subtract (negative)
184+
"""
185+
collector = _get_or_create_collector()
186+
if collector is None:
187+
return
188+
189+
try:
190+
collector.record_connection_count(
191+
pool_name=pool_name,
192+
connection_state=connection_state,
193+
counter=counter,
194+
)
195+
except Exception:
196+
pass
197+
198+
199+
@deprecated_function(
200+
reason="Connection count is now tracked via record_connection_count(). "
201+
"This functionality will be removed in the next major version",
202+
version="7.4.0",
203+
)
171204
async def init_connection_count() -> None:
172205
"""
173206
Initialize observable gauge for connection count metric.
@@ -194,6 +227,11 @@ def observable_callback(__):
194227
pass
195228

196229

230+
@deprecated_function(
231+
reason="Connection count is now tracked via record_connection_count(). "
232+
"This functionality will be removed in the next major version",
233+
version="7.4.0",
234+
)
197235
async def register_pools_connection_count(
198236
connection_pools: List["ConnectionPool"],
199237
) -> None:
@@ -301,7 +339,7 @@ async def record_connection_relaxed_timeout(
301339
Record a connection timeout relaxation event.
302340
303341
Args:
304-
connection_name: Connection identifier
342+
connection_name: Connection identifier (pool name)
305343
maint_notification: Maintenance notification type
306344
relaxed: True to count up (relaxed), False to count down (unrelaxed)
307345
"""

redis/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Callable,
1111
Dict,
1212
List,
13+
Literal,
1314
Mapping,
1415
Optional,
1516
Set,
@@ -151,6 +152,9 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
151152
It is not safe to pass PubSub or Pipeline objects between threads.
152153
"""
153154

155+
# Type discrimination marker for @overload self-type pattern
156+
_is_async_client: Literal[False] = False
157+
154158
@classmethod
155159
def from_url(cls, url: str, **kwargs) -> "Redis":
156160
"""

0 commit comments

Comments
 (0)