Skip to content

Commit 2098114

Browse files
fix(pubsub): avoid UnicodeDecodeError on reconnect with binary channel names (#3944)
* fix(pubsub): avoid UnicodeDecodeError on reconnect with binary channel names Channels subscribed as positional arguments (without a callback handler) may carry binary names that are not valid in the connection's encoding (e.g. arbitrary bytes that are not valid UTF-8). The existing `on_connect` method decoded every channel name via `force=True` to pass them as keyword arguments to `subscribe`/`psubscribe`, which raised `UnicodeDecodeError` for these channels. Split the reconnection logic: channels with handlers are decoded and passed as kwargs (they were originally subscribed as kwargs, so their names are guaranteed decodable); channels without handlers are passed as positional args, preserving the original bytes. Applied the same fix to: - async PubSub.on_connect (channels and patterns) - sync PubSub.on_connect (channels, patterns, and shard_channels) Added tests for binary channel and pattern reconnection in both sync and async test suites. Fixes #3912 * Applying review comments. --------- Co-authored-by: petyaslavova <petya.slavova@redis.com>
1 parent f02c66b commit 2098114

4 files changed

Lines changed: 157 additions & 19 deletions

File tree

redis/asyncio/client.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -959,18 +959,38 @@ async def on_connect(self, connection: Connection):
959959
# NOTE: for python3, we can't pass bytestrings as keyword arguments
960960
# so we need to decode channel/pattern names back to unicode strings
961961
# before passing them to [p]subscribe.
962+
#
963+
# However, channels subscribed without a callback (positional args) may
964+
# have binary names that are not valid in the current encoding (e.g.
965+
# arbitrary bytes that are not valid UTF-8). These channels are stored
966+
# with a ``None`` handler. We re-subscribe them as positional args so
967+
# that no decoding is required.
962968
self.pending_unsubscribe_channels.clear()
963969
self.pending_unsubscribe_patterns.clear()
964970
if self.channels:
965-
channels = {}
971+
channels_with_handlers = {}
972+
channels_without_handlers = []
966973
for k, v in self.channels.items():
967-
channels[self.encoder.decode(k, force=True)] = v
968-
await self.subscribe(**channels)
974+
if v is not None:
975+
channels_with_handlers[self.encoder.decode(k, force=True)] = v
976+
else:
977+
channels_without_handlers.append(k)
978+
if channels_with_handlers or channels_without_handlers:
979+
await self.subscribe(
980+
*channels_without_handlers, **channels_with_handlers
981+
)
969982
if self.patterns:
970-
patterns = {}
983+
patterns_with_handlers = {}
984+
patterns_without_handlers = []
971985
for k, v in self.patterns.items():
972-
patterns[self.encoder.decode(k, force=True)] = v
973-
await self.psubscribe(**patterns)
986+
if v is not None:
987+
patterns_with_handlers[self.encoder.decode(k, force=True)] = v
988+
else:
989+
patterns_without_handlers.append(k)
990+
if patterns_with_handlers or patterns_without_handlers:
991+
await self.psubscribe(
992+
*patterns_without_handlers, **patterns_with_handlers
993+
)
974994

975995
@property
976996
def subscribed(self):

redis/client.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -967,25 +967,45 @@ def on_connect(self, connection) -> None:
967967
# NOTE: for python3, we can't pass bytestrings as keyword arguments
968968
# so we need to decode channel/pattern names back to unicode strings
969969
# before passing them to [p]subscribe.
970+
#
971+
# However, channels subscribed without a callback (positional args) may
972+
# have binary names that are not valid in the current encoding (e.g.
973+
# arbitrary bytes that are not valid UTF-8). These channels are stored
974+
# with a ``None`` handler. We re-subscribe them as positional args so
975+
# that no decoding is required.
970976
self.pending_unsubscribe_channels.clear()
971977
self.pending_unsubscribe_patterns.clear()
972978
self.pending_unsubscribe_shard_channels.clear()
973979
if self.channels:
974-
channels = {
975-
self.encoder.decode(k, force=True): v for k, v in self.channels.items()
976-
}
977-
self.subscribe(**channels)
980+
channels_with_handlers = {}
981+
channels_without_handlers = []
982+
for k, v in self.channels.items():
983+
if v is not None:
984+
channels_with_handlers[self.encoder.decode(k, force=True)] = v
985+
else:
986+
channels_without_handlers.append(k)
987+
if channels_with_handlers or channels_without_handlers:
988+
self.subscribe(*channels_without_handlers, **channels_with_handlers)
978989
if self.patterns:
979-
patterns = {
980-
self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
981-
}
982-
self.psubscribe(**patterns)
990+
patterns_with_handlers = {}
991+
patterns_without_handlers = []
992+
for k, v in self.patterns.items():
993+
if v is not None:
994+
patterns_with_handlers[self.encoder.decode(k, force=True)] = v
995+
else:
996+
patterns_without_handlers.append(k)
997+
if patterns_with_handlers or patterns_without_handlers:
998+
self.psubscribe(*patterns_without_handlers, **patterns_with_handlers)
983999
if self.shard_channels:
984-
shard_channels = {
985-
self.encoder.decode(k, force=True): v
986-
for k, v in self.shard_channels.items()
987-
}
988-
self.ssubscribe(**shard_channels)
1000+
shard_with_handlers = {}
1001+
shard_without_handlers = []
1002+
for k, v in self.shard_channels.items():
1003+
if v is not None:
1004+
shard_with_handlers[self.encoder.decode(k, force=True)] = v
1005+
else:
1006+
shard_without_handlers.append(k)
1007+
if shard_with_handlers or shard_without_handlers:
1008+
self.ssubscribe(*shard_without_handlers, **shard_with_handlers)
9891009

9901010
@property
9911011
def subscribed(self) -> bool:

tests/test_asyncio/test_pubsub.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,54 @@ async def test_resubscribe_to_patterns_on_reconnection(self, pubsub):
160160
kwargs = make_subscribe_test_data(pubsub, "pattern")
161161
await self._test_resubscribe_on_reconnection(**kwargs)
162162

163+
async def test_resubscribe_binary_channel_on_reconnection(self, pubsub):
164+
"""Binary channel names that are not valid UTF-8 must survive
165+
reconnection without raising ``UnicodeDecodeError``.
166+
See https://github.com/redis/redis-py/issues/3912
167+
"""
168+
# b'\x80\x81\x82' is deliberately invalid UTF-8
169+
binary_channel = b"\x80\x81\x82"
170+
p = pubsub
171+
await p.subscribe(binary_channel)
172+
assert await wait_for_message(p) is not None # consume subscribe ack
173+
174+
# force reconnect
175+
await p.connection.disconnect()
176+
177+
# get_message triggers on_connect → re-subscribe; must not raise
178+
messages = []
179+
for _ in range(1):
180+
message = await wait_for_message(p)
181+
assert message is not None
182+
messages.append(message)
183+
184+
assert len(messages) == 1
185+
assert messages[0]["type"] == "subscribe"
186+
assert messages[0]["channel"] == binary_channel
187+
188+
async def test_resubscribe_binary_pattern_on_reconnection(self, pubsub):
189+
"""Binary pattern names that are not valid UTF-8 must survive
190+
reconnection without raising ``UnicodeDecodeError``.
191+
See https://github.com/redis/redis-py/issues/3912
192+
"""
193+
binary_pattern = b"\x80\x81*"
194+
p = pubsub
195+
await p.psubscribe(binary_pattern)
196+
assert await wait_for_message(p) is not None # consume psubscribe ack
197+
198+
# force reconnect
199+
await p.connection.disconnect()
200+
201+
messages = []
202+
for _ in range(1):
203+
message = await wait_for_message(p)
204+
assert message is not None
205+
messages.append(message)
206+
207+
assert len(messages) == 1
208+
assert messages[0]["type"] == "psubscribe"
209+
assert messages[0]["channel"] == binary_pattern
210+
163211
async def _test_subscribed_property(
164212
self, p, sub_type, unsub_type, sub_func, unsub_func, keys
165213
):

tests/test_pubsub.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,56 @@ def test_resubscribe_to_shard_channels_on_reconnection(self, r):
201201
kwargs = make_subscribe_test_data(r.pubsub(), "shard_channel")
202202
self._test_resubscribe_on_reconnection(**kwargs)
203203

204+
@pytest.mark.onlynoncluster
205+
def test_resubscribe_binary_channel_on_reconnection(self, r):
206+
"""Binary channel names that are not valid UTF-8 must survive
207+
reconnection without raising ``UnicodeDecodeError``.
208+
See https://github.com/redis/redis-py/issues/3912
209+
"""
210+
# b'\x80\x81\x82' is deliberately invalid UTF-8
211+
binary_channel = b"\x80\x81\x82"
212+
p = r.pubsub()
213+
p.subscribe(binary_channel)
214+
assert wait_for_message(p) is not None # consume subscribe ack
215+
216+
# force reconnect
217+
p.connection.disconnect()
218+
219+
# get_message triggers on_connect → re-subscribe; must not raise
220+
messages = []
221+
for _ in range(1):
222+
message = wait_for_message(p)
223+
assert message is not None
224+
messages.append(message)
225+
226+
assert len(messages) == 1
227+
assert messages[0]["type"] == "subscribe"
228+
assert messages[0]["channel"] == binary_channel
229+
230+
@pytest.mark.onlynoncluster
231+
def test_resubscribe_binary_pattern_on_reconnection(self, r):
232+
"""Binary pattern names that are not valid UTF-8 must survive
233+
reconnection without raising ``UnicodeDecodeError``.
234+
See https://github.com/redis/redis-py/issues/3912
235+
"""
236+
binary_pattern = b"\x80\x81*"
237+
p = r.pubsub()
238+
p.psubscribe(binary_pattern)
239+
assert wait_for_message(p) is not None # consume psubscribe ack
240+
241+
# force reconnect
242+
p.connection.disconnect()
243+
244+
messages = []
245+
for _ in range(1):
246+
message = wait_for_message(p)
247+
assert message is not None
248+
messages.append(message)
249+
250+
assert len(messages) == 1
251+
assert messages[0]["type"] == "psubscribe"
252+
assert messages[0]["channel"] == binary_pattern
253+
204254
def _test_subscribed_property(
205255
self, p, sub_type, unsub_type, sub_func, unsub_func, keys
206256
):

0 commit comments

Comments
 (0)