1616import asyncio
1717import json
1818import logging
19+ import time
1920
2021from asgiref .sync import sync_to_async
2122from django .http import StreamingHttpResponse
3637
3738KEEPALIVE_SECONDS = 15
3839
40+ # Upper bound on a single SSE connection's lifetime. The browser's EventSource
41+ # transparently reconnects when the server closes the stream, so capping this
42+ # is invisible to the user but critical for the server: under ASGI the *entire*
43+ # stream runs inside one request (one `ThreadSensitiveContext`), so an
44+ # unbounded `while True` stream pins a worker thread — and any DB/redis
45+ # connection bound to it — for as long as the tab stays open (hours). Left
46+ # unbounded this is what slowly exhausts a shared Postgres cluster and leaves
47+ # workers undrainable on deploy. 5 minutes bounds the leak window and lets
48+ # workers recycle.
49+ MAX_STREAM_SECONDS = 300
50+
3951DEFAULT_LIMIT = 20
4052MAX_LIMIT = 100
4153
@@ -141,38 +153,67 @@ async def _aget_serialized(notif_id, recipient_id):
141153 """Fetch a notification by id, scoped to recipient. Returns dict or None."""
142154 @sync_to_async
143155 def fetch ():
144- notif = Notification .objects .filter (
145- pk = notif_id , recipient_id = recipient_id
146- ).first ()
147- if notif is None :
148- return None
149- return NotificationSerializer (notif ).data
156+ from django .db import connection
157+
158+ try :
159+ notif = Notification .objects .filter (
160+ pk = notif_id , recipient_id = recipient_id
161+ ).first ()
162+ if notif is None :
163+ return None
164+ return NotificationSerializer (notif ).data
165+ finally :
166+ # This runs in asgiref's shared thread-sensitive executor, outside
167+ # Django's request/response cycle — so `close_old_connections`
168+ # never fires for it. Without this explicit close the connection
169+ # dangles open (the SSE tests need TransactionTestCase precisely
170+ # because of it). Close it so each fetch reclaims its connection.
171+ connection .close ()
150172
151173 return await fetch ()
152174
153175
176+ async def _aclose_redis (obj ):
177+ """Close a redis.asyncio client/pubsub across versions (aclose vs close)."""
178+ if obj is None :
179+ return
180+ closer = getattr (obj , "aclose" , None ) or getattr (obj , "close" , None )
181+ if closer is None :
182+ return
183+ try :
184+ result = closer ()
185+ if asyncio .iscoroutine (result ):
186+ await result
187+ except Exception : # pragma: no cover - best effort
188+ pass
189+
190+
154191async def _open_pubsub (channel : str ):
155192 """Open a redis.asyncio pubsub subscribed to ``channel``.
156193
157- Returns ``None`` if Redis is unreachable; the stream then runs in
158- keepalive-only mode (the frontend's polling-since path provides
159- backfill).
194+ Returns ``(client, pubsub)``. Both are ``None`` if Redis is unreachable;
195+ the stream then runs in keepalive-only mode (the frontend's polling-since
196+ path provides backfill). The caller owns closing BOTH the pubsub and the
197+ underlying client connection pool — closing only the pubsub leaks the
198+ client's pooled connections, one per dropped SSE stream.
160199 """
161200 try :
162201 import redis .asyncio as aioredis # type: ignore
163202 except ImportError : # pragma: no cover
164- return None
203+ return None , None
165204 from django .conf import settings
166205
167206 url = getattr (settings , "CELERY_BROKER_URL" , None ) or "redis://localhost:6379/0"
207+ client = None
168208 try :
169209 client = aioredis .from_url (url )
170210 pubsub = client .pubsub ()
171211 await pubsub .subscribe (channel )
172- return pubsub
212+ return client , pubsub
173213 except Exception as exc :
174214 logger .warning ("SSE redis subscribe failed (%s); keepalive-only mode" , exc )
175- return None
215+ await _aclose_redis (client )
216+ return None , None
176217
177218
178219async def _stream_events (channel : str , recipient_id , * , pubsub = None ):
@@ -182,12 +223,18 @@ async def _stream_events(channel: str, recipient_id, *, pubsub=None):
182223 opens its own redis.asyncio pubsub via :func:`_open_pubsub`.
183224 """
184225 owns_pubsub = pubsub is None
226+ client = None
185227 if owns_pubsub :
186- pubsub = await _open_pubsub (channel )
228+ client , pubsub = await _open_pubsub (channel )
229+ deadline = time .monotonic () + MAX_STREAM_SECONDS
187230 try :
188231 # Initial comment so the client confirms the stream opened.
189232 yield _format_keepalive ()
190233 while True :
234+ # Bounded lifetime: end the stream so the worker thread and its
235+ # DB/redis connections are released. EventSource reconnects.
236+ if time .monotonic () >= deadline :
237+ return
191238 if pubsub is None :
192239 await asyncio .sleep (KEEPALIVE_SECONDS )
193240 yield _format_keepalive ()
@@ -211,12 +258,15 @@ async def _stream_events(channel: str, recipient_id, *, pubsub=None):
211258 continue
212259 yield _format_sse ("notification" , payload )
213260 finally :
214- if owns_pubsub and pubsub is not None :
215- try :
216- await pubsub .unsubscribe (channel )
217- await pubsub .close ()
218- except Exception : # pragma: no cover - best effort
219- pass
261+ if owns_pubsub :
262+ if pubsub is not None :
263+ try :
264+ await pubsub .unsubscribe (channel )
265+ except Exception : # pragma: no cover - best effort
266+ pass
267+ await _aclose_redis (pubsub )
268+ # Close the underlying client pool too — not just the pubsub.
269+ await _aclose_redis (client )
220270
221271
222272def _drive_async_gen (agen ):
@@ -271,6 +321,18 @@ def get(self, request, *args, **kwargs):
271321 profile_id = request .profile .id
272322 channel = notif_mod .channel_for (org_id , profile_id )
273323
324+ # Release THIS request thread's DB connection before streaming begins.
325+ # Under ASGI the entire stream runs inside one request's
326+ # ThreadSensitiveContext, so the connection opened by auth/middleware
327+ # would otherwise stay checked out for the full (possibly hours-long)
328+ # stream — one leaked Postgres connection per open browser tab, which
329+ # is what exhausts the shared cluster. The stream's own reads go
330+ # through a separate executor (see `_aget_serialized`) and close
331+ # themselves, so dropping this one is safe.
332+ from django .db import connection
333+
334+ connection .close ()
335+
274336 response = StreamingHttpResponse (
275337 _drive_async_gen (_stream_events (channel , profile_id )),
276338 content_type = "text/event-stream" ,
0 commit comments