Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions redis/_parsers/hiredis.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
if custom_timeout:
sock.settimeout(self._socket_timeout)

def read_response(self, disable_decoding=False, push_request=False):
def read_response(
self,
disable_decoding=False,
push_request=False,
timeout: Union[float, object] = SENTINEL,
Comment thread
vladvildanov marked this conversation as resolved.
):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

Expand All @@ -152,6 +157,7 @@ def read_response(self, disable_decoding=False, push_request=False):
return self.read_response(
disable_decoding=disable_decoding,
push_request=push_request,
timeout=timeout,
)
return response

Expand All @@ -161,7 +167,7 @@ def read_response(self, disable_decoding=False, push_request=False):
response = self._reader.gets()

while response is NOT_ENOUGH_DATA:
self.read_from_socket()
self.read_from_socket(timeout=timeout)
Comment thread
cursor[bot] marked this conversation as resolved.
if disable_decoding:
response = self._reader.gets(False)
else:
Expand Down
20 changes: 13 additions & 7 deletions redis/_parsers/resp2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
from ..exceptions import ConnectionError, InvalidResponse, ResponseError
from ..typing import EncodableT
from .base import _AsyncRESPBase, _RESPBase
from .socket import SERVER_CLOSED_CONNECTION_ERROR
from .socket import SENTINEL, SERVER_CLOSED_CONNECTION_ERROR


class _RESP2Parser(_RESPBase):
"""RESP2 protocol implementation"""

def read_response(self, disable_decoding=False):
def read_response(
self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
):
pos = self._buffer.get_pos() if self._buffer else None
try:
result = self._read_response(disable_decoding=disable_decoding)
result = self._read_response(
disable_decoding=disable_decoding, timeout=timeout
)
except BaseException:
if self._buffer:
self._buffer.rewind(pos)
Expand All @@ -21,8 +25,10 @@ def read_response(self, disable_decoding=False):
self._buffer.purge()
return result

def _read_response(self, disable_decoding=False):
raw = self._buffer.readline()
def _read_response(
self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
):
raw = self._buffer.readline(timeout=timeout)
if not raw:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

Expand Down Expand Up @@ -51,13 +57,13 @@ def _read_response(self, disable_decoding=False):
elif byte == b"$" and response == b"-1":
return None
elif byte == b"$":
response = self._buffer.read(int(response))
response = self._buffer.read(int(response), timeout=timeout)
# multi-bulk response
elif byte == b"*" and response == b"-1":
return None
elif byte == b"*":
response = [
self._read_response(disable_decoding=disable_decoding)
self._read_response(disable_decoding=disable_decoding, timeout=timeout)
for i in range(int(response))
]
else:
Expand Down
44 changes: 31 additions & 13 deletions redis/_parsers/resp3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
_AsyncRESPBase,
_RESPBase,
)
from .socket import SERVER_CLOSED_CONNECTION_ERROR
from .socket import SENTINEL, SERVER_CLOSED_CONNECTION_ERROR


class _RESP3Parser(_RESPBase, PushNotificationsParser):
Expand All @@ -28,11 +28,18 @@ def handle_pubsub_push_response(self, response):
logger.debug("Push response: " + str(response))
return response

def read_response(self, disable_decoding=False, push_request=False):
def read_response(
self,
disable_decoding=False,
push_request=False,
timeout: Union[float, object] = SENTINEL,
):
pos = self._buffer.get_pos() if self._buffer is not None else None
try:
result = self._read_response(
disable_decoding=disable_decoding, push_request=push_request
disable_decoding=disable_decoding,
push_request=push_request,
timeout=timeout,
)
except BaseException:
if self._buffer is not None:
Expand All @@ -48,8 +55,13 @@ def read_response(self, disable_decoding=False, push_request=False):
pass
return result

def _read_response(self, disable_decoding=False, push_request=False):
raw = self._buffer.readline()
def _read_response(
self,
disable_decoding=False,
push_request=False,
timeout: Union[float, object] = SENTINEL,
):
raw = self._buffer.readline(timeout=timeout)
if not raw:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

Expand All @@ -58,7 +70,7 @@ def _read_response(self, disable_decoding=False, push_request=False):
# server returned an error
if byte in (b"-", b"!"):
if byte == b"!":
response = self._buffer.read(int(response))
response = self._buffer.read(int(response), timeout=timeout)
response = response.decode("utf-8", errors="replace")
error = self.parse_error(response)
# if the error is a ConnectionError, raise immediately so the user
Expand Down Expand Up @@ -87,22 +99,22 @@ def _read_response(self, disable_decoding=False, push_request=False):
return response == b"t"
# bulk response
elif byte == b"$":
response = self._buffer.read(int(response))
response = self._buffer.read(int(response), timeout=timeout)
# verbatim string response
elif byte == b"=":
response = self._buffer.read(int(response))[4:]
response = self._buffer.read(int(response), timeout=timeout)[4:]
# array response
elif byte == b"*":
response = [
self._read_response(disable_decoding=disable_decoding)
self._read_response(disable_decoding=disable_decoding, timeout=timeout)
for _ in range(int(response))
]
# set response
elif byte == b"~":
# redis can return unhashable types (like dict) in a set,
# so we return sets as list, all the time, for predictability
response = [
self._read_response(disable_decoding=disable_decoding)
self._read_response(disable_decoding=disable_decoding, timeout=timeout)
for _ in range(int(response))
]
# map response
Expand All @@ -112,16 +124,22 @@ def _read_response(self, disable_decoding=False, push_request=False):
# became defined to be left-right in version 3.8
resp_dict = {}
for _ in range(int(response)):
key = self._read_response(disable_decoding=disable_decoding)
key = self._read_response(
disable_decoding=disable_decoding, timeout=timeout
)
resp_dict[key] = self._read_response(
disable_decoding=disable_decoding, push_request=push_request
disable_decoding=disable_decoding,
push_request=push_request,
timeout=timeout,
)
response = resp_dict
# push response
elif byte == b">":
response = [
self._read_response(
disable_decoding=disable_decoding, push_request=push_request
disable_decoding=disable_decoding,
push_request=push_request,
timeout=timeout,
Comment thread
cursor[bot] marked this conversation as resolved.
)
for _ in range(int(response))
]
Expand Down
8 changes: 4 additions & 4 deletions redis/_parsers/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,23 @@ def can_read(self, timeout: float) -> bool:
timeout=timeout, raise_on_timeout=False
)

def read(self, length: int) -> bytes:
def read(self, length: int, timeout: Union[float, object] = SENTINEL) -> bytes:
length = length + 2 # make sure to read the \r\n terminator
# BufferIO will return less than requested if buffer is short
data = self._buffer.read(length)
missing = length - len(data)
if missing:
# fill up the buffer and read the remainder
self._read_from_socket(missing)
self._read_from_socket(length=missing, timeout=timeout)
data += self._buffer.read(missing)
return data[:-2]

def readline(self) -> bytes:
def readline(self, timeout: Union[float, object] = SENTINEL) -> bytes:
buf = self._buffer
data = buf.readline()
while not data.endswith(SYM_CRLF):
# there's more data in the socket that we need
self._read_from_socket()
self._read_from_socket(timeout=timeout)
data += buf.readline()

return data[:-2]
Expand Down
39 changes: 38 additions & 1 deletion redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,44 @@ def failure_callback(error, failure_count):
raise

async def parse_response(self, block: bool = True, timeout: float = 0):
"""Parse the response from a publish/subscribe command"""
"""
Parse the response from a publish/subscribe command.

Args:
block: If True, block indefinitely until a message is available.
If False, return immediately if no message is available.
Default: True
timeout: The timeout in seconds for reading a response when block=False.
This parameter is ignored when block=True.
Default: 0 (return immediately if no data available)

Returns:
The parsed response from the server, or None if no message is available
within the timeout period (when block=False).

Important:
The block and timeout parameters work together:
- When block=True: timeout is IGNORED, method blocks indefinitely
- When block=False: timeout is USED, method returns after timeout expires

Typically, you should use get_message(timeout=X) instead of calling
parse_response() directly. The get_message() method automatically sets
block=False when a timeout is provided, and block=True when timeout=None.

Example:
# Block indefinitely (timeout is ignored)
response = await pubsub.parse_response(block=True, timeout=0.1)

# Non-blocking with 0.1 second timeout
response = await pubsub.parse_response(block=False, timeout=0.1)

# Non-blocking, return immediately
response = await pubsub.parse_response(block=False, timeout=0)

# Recommended: use get_message() instead
msg = await pubsub.get_message(timeout=0.1) # automatically sets block=False
msg = await pubsub.get_message(timeout=None) # automatically sets block=True
"""
conn = self.connection
if conn is None:
raise RuntimeError(
Expand Down
46 changes: 44 additions & 2 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_RedisCallbacksRESP3,
bool_ok,
)
from redis._parsers.socket import SENTINEL
from redis.backoff import ExponentialWithJitterBackoff
from redis.cache import CacheConfig, CacheInterface
from redis.commands import (
Expand Down Expand Up @@ -1157,7 +1158,44 @@ def failure_callback(error, failure_count):
raise

def parse_response(self, block=True, timeout=0):
"""Parse the response from a publish/subscribe command"""
"""
Parse the response from a publish/subscribe command.

Args:
block: If True, block indefinitely until a message is available.
If False, return immediately if no message is available.
Default: True
timeout: The timeout in seconds for reading a response when block=False.
This parameter is ignored when block=True.
Default: 0 (return immediately if no data available)

Returns:
The parsed response from the server, or None if no message is available
within the timeout period (when block=False).

Important:
The block and timeout parameters work together:
- When block=True: timeout is IGNORED, method blocks indefinitely
- When block=False: timeout is USED, method returns after timeout expires

Typically, you should use get_message(timeout=X) instead of calling
parse_response() directly. The get_message() method automatically sets
block=False when a timeout is provided, and block=True when timeout=None.

Example:
# Block indefinitely (timeout is ignored)
response = pubsub.parse_response(block=True, timeout=0.1)

# Non-blocking with 0.1 second timeout
response = pubsub.parse_response(block=False, timeout=0.1)

# Non-blocking, return immediately
response = pubsub.parse_response(block=False, timeout=0)

# Recommended: use get_message() instead
msg = pubsub.get_message(timeout=0.1) # automatically sets block=False
msg = pubsub.get_message(timeout=None) # automatically sets block=True
"""
conn = self.connection
if conn is None:
raise RuntimeError(
Expand All @@ -1171,9 +1209,13 @@ def try_read():
if not block:
if not conn.can_read(timeout=timeout):
return None
read_timeout = timeout
else:
conn.connect()
return conn.read_response(disconnect_on_error=False, push_request=True)
read_timeout = SENTINEL # Use default socket timeout for blocking
return conn.read_response(
disconnect_on_error=False, push_request=True, timeout=read_timeout
)

response = self._execute(conn, try_read)

Expand Down
10 changes: 7 additions & 3 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2669,10 +2669,14 @@ def _get_node_pubsub(self, node):
self.node_pubsub_mapping[node.name] = pubsub
return pubsub

def _sharded_message_generator(self):
def _sharded_message_generator(self, timeout=0.0):
for _ in range(len(self.node_pubsub_mapping)):
pubsub = next(self._pubsubs_generator)
message = pubsub.get_message()
# Don't pass ignore_subscribe_messages here - let get_sharded_message
# handle the filtering after processing subscription state changes
message = pubsub.get_message(
ignore_subscribe_messages=False, timeout=timeout
)
if message is not None:
return message
return None
Expand All @@ -2690,7 +2694,7 @@ def get_sharded_message(
ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
)
else:
message = self._sharded_message_generator()
message = self._sharded_message_generator(timeout=timeout)
if message is None:
return None
elif str_if_bytes(message["type"]) == "sunsubscribe":
Expand Down
Loading