Skip to content

fix: drain accept queues on gthread worker shutdown to prevent connection resets#3612

Open
wckao wants to merge 1 commit intobenoitc:masterfrom
wckao:fix/gthread-drain-accept-queue-on-shutdown
Open

fix: drain accept queues on gthread worker shutdown to prevent connection resets#3612
wckao wants to merge 1 commit intobenoitc:masterfrom
wckao:fix/gthread-drain-accept-queue-on-shutdown

Conversation

@wckao
Copy link
Copy Markdown

@wckao wckao commented Apr 30, 2026

Summary

The gthread worker's shutdown sequence has a race condition that causes Connection reset by peer errors for clients when reuse_port=True. When a worker exits its main loop (via max_requests, a post_request hook setting worker.alive = False, or SIGTERM), connections that completed the TCP handshake but were not yet accept()-ed by the application sit in the kernel's accept queue. With SO_REUSEPORT each worker owns its listener socket, so these orphaned connections receive a TCP RST when the socket is closed.

Root cause

In ThreadWorker.run(), the event loop's accept() callback dequeues one connection per selector event. When self.alive becomes False, the loop exits immediately — any remaining connections in the kernel backlog are never application-accepted. The listener socket is closed at the end of run(), which sends RST to those connections.

Without SO_REUSEPORT the bug is masked because the arbiter holds the shared listener open and the replacement worker picks up the backlogged connections. With reuse_port=True each worker's socket is independent, so closing it destroys its backlog.

Fix

Add a two-pass drain-then-close sequence executed once the main loop exits:

  1. Unregister listeners from the poller (set_accept_enabled(False)).
  2. For each listener:
    • First pass: accept() all pending connections from the kernel backlog.
    • Brief 10 ms pause for in-flight TCP handshakes to land (SYN-ACK sent, ACK not yet received).
    • Second pass: accept() stragglers.
    • close() the listener socket immediately after draining.

Accepted connections are submitted to the thread pool and waited on during the existing graceful-timeout drain loop. After the close, new clients get a clean ECONNREFUSED instead of a RST.

Changes

  • gunicorn/workers/gthread.py

    • Added _drain_listener(listener): accepts all pending connections from a single listener.
    • Added _drain_accept_queues(close_listeners=False): iterates listeners, performs two-pass drain, optionally closes each socket immediately after draining.
    • Updated run() shutdown sequence: calls _drain_accept_queues(close_listeners=True) after disabling accept, and removes the now-redundant late s.close() loop.
  • tests/test_gthread.py

    • 8 new unit tests in TestDrainAcceptQueues covering: drain all pending, empty queue no-op, multiple listeners, ECONNABORTED handling, close_listeners flag, integration with run(), and grace-period processing.

How to reproduce

Set max_requests=5, reuse_port=True, single gthread worker, and fire bursts of 30 concurrent requests:

Integration test script (standalone, not part of the committed change)
#!/usr/bin/env python3
#
# Integration test for gthread worker shutdown drain fix.
#
# Demonstrates that the gthread worker properly drains pending TCP
# connections from the kernel accept queue during worker shutdown,
# preventing "Connection reset by peer" errors for clients.
#
# The test starts a real gunicorn server with a single gthread worker
# and uses max_requests to trigger frequent worker restarts. During
# each restart, a burst of concurrent requests is fired. Without the
# fix, connections that completed the TCP handshake but were not yet
# accept()-ed by the worker are orphaned and receive a TCP RST when
# the listening socket is closed.
#
# Usage:
#     python scripts/test_gthread_shutdown_drain.py
#
# Expected results:
#     WITH fix (_drain_accept_queues in shutdown):  PASS (0 resets)
#     WITHOUT fix (revert gthread.py changes):      FAIL (1+ resets)
#
# Note: Uses reuse_port=True so each worker owns its listener socket.
# Without SO_REUSEPORT, the arbiter holds the shared listener open
# across restarts, masking the bug.
#
# Requires: gunicorn importable from the current environment.

import http.client
import os
import signal
import socket
import subprocess
import sys
import tempfile
import textwrap
import threading
import time
from collections import Counter

# ---------------------------------------------------------------------------
# Test parameters
# ---------------------------------------------------------------------------
WORKERS = 1           # Single worker for deterministic restarts
THREADS = 4           # Thread pool size
MAX_REQUESTS = 5      # Worker restarts after every 5 requests
BURST_SIZE = 30       # Concurrent requests per burst
CYCLES = 10           # Number of restart cycles to test
REQUEST_TIMEOUT = 5   # Per-request timeout (seconds)
WORKER_READY_PAUSE = 1.5  # Seconds to wait for new worker after restart


def find_free_port():
    """Bind to port 0 and let the OS pick a free port."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 0))
        return s.getsockname()[1]


def create_app_file(path):
    """Write a minimal WSGI app that returns JSON with the worker PID."""
    with open(path, 'w') as f:
        f.write(textwrap.dedent('''\
            import json
            import os
            import time

            def app(environ, start_response):
                # Small delay keeps thread-pool threads busy so that
                # concurrent connections queue up during restarts.
                time.sleep(0.05)
                body = json.dumps({"pid": os.getpid()}).encode()
                start_response("200 OK", [
                    ("Content-Type", "application/json"),
                    ("Content-Length", str(len(body))),
                ])
                return [body]
        '''))


def create_config_file(path, port):
    """Write a gunicorn config tailored for the test."""
    with open(path, 'w') as f:
        f.write(textwrap.dedent(f'''\
            bind = "127.0.0.1:{port}"
            workers = {WORKERS}
            threads = {THREADS}
            worker_class = "gthread"
            max_requests = {MAX_REQUESTS}
            max_requests_jitter = 0
            graceful_timeout = 5
            reuse_port = True
            loglevel = "warning"
        '''))


def wait_for_server(port, timeout=15):
    """Block until the server responds to a GET /."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            conn = http.client.HTTPConnection('127.0.0.1', port, timeout=2)
            conn.request('GET', '/')
            resp = conn.getresponse()
            resp.read()
            conn.close()
            return True
        except Exception:
            time.sleep(0.2)
    return False


def make_request(port):
    """
    Send one GET / and classify the outcome.

    Returns a tuple (category, detail) where category is one of:
        'ok'    - 200 response received
        'reset' - connection reset (the bug under test)
        'refused' - connection refused (expected during restart gap)
        'error' - other transport error
    """
    try:
        conn = http.client.HTTPConnection('127.0.0.1', port, timeout=REQUEST_TIMEOUT)
        conn.request('GET', '/')
        resp = conn.getresponse()
        resp.read()
        conn.close()
        return ('ok', None)
    except ConnectionResetError:
        return ('reset', 'ConnectionResetError')
    except BrokenPipeError:
        return ('reset', 'BrokenPipeError')
    except http.client.RemoteDisconnected:
        return ('reset', 'RemoteDisconnected')
    except ConnectionRefusedError:
        return ('refused', 'ConnectionRefusedError')
    except socket.timeout:
        return ('error', 'Timeout')
    except OSError as e:
        if e.errno in (54, 104):          # ECONNRESET on macOS / Linux
            return ('reset', f'OSError({e.errno})')
        return ('error', f'OSError({e.errno})')
    except Exception as e:
        return ('error', type(e).__name__)


def fire_burst(port, count):
    """Fire *count* concurrent GET / requests and return the result list."""
    results = []
    lock = threading.Lock()

    def worker():
        result = make_request(port)
        with lock:
            results.append(result)

    threads = [threading.Thread(target=worker) for _ in range(count)]
    # Start all threads as fast as possible so connections race against
    # the worker shutdown triggered by max_requests.
    for t in threads:
        t.start()
    for t in threads:
        t.join(timeout=REQUEST_TIMEOUT + 5)

    return results


def main():
    port = find_free_port()

    print("=" * 70)
    print("  gthread worker shutdown drain - integration test")
    print("=" * 70)
    print()
    print("  Fires concurrent HTTP requests while max_requests triggers")
    print("  worker restarts, and checks for connection resets.")
    print()
    print("  Expected:")
    print("    WITH fix   -> 0 connection resets   (PASS)")
    print("    WITHOUT fix -> 1+ connection resets  (FAIL)")
    print()
    print(f"  workers={WORKERS}  threads={THREADS}  max_requests={MAX_REQUESTS}")
    print(f"  burst={BURST_SIZE}  cycles={CYCLES}  reuse_port=True")
    print(f"  port={port}")
    print()

    with tempfile.TemporaryDirectory() as tmpdir:
        app_path = os.path.join(tmpdir, 'wsgi_app.py')
        cfg_path = os.path.join(tmpdir, 'gunicorn.conf.py')
        create_app_file(app_path)
        create_config_file(cfg_path, port)

        cmd = [sys.executable, '-m', 'gunicorn', 'wsgi_app:app', '-c', cfg_path]

        # Ensure gunicorn is importable in the subprocess.  When running
        # from a source checkout (no pip install), PYTHONPATH must include
        # the repo root so ``python -m gunicorn`` works.
        env = os.environ.copy()
        repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        pp = env.get('PYTHONPATH', '')
        env['PYTHONPATH'] = repo_root + (os.pathsep + pp if pp else '')

        print("  Starting gunicorn ...")
        proc = subprocess.Popen(
            cmd, cwd=tmpdir, env=env,
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        )

        try:
            if not wait_for_server(port):
                stderr = proc.stderr.read().decode(errors='replace')
                print(f"  ERROR: server failed to start\n{stderr[:800]}")
                return 1

            print(f"  Server ready  (arbiter pid={proc.pid})")
            print()

            total_ok = 0
            total_reset = 0
            total_refused = 0
            total_error = 0
            reset_details = Counter()

            for cycle in range(CYCLES):
                results = fire_burst(port, BURST_SIZE)

                ok      = sum(1 for c, _ in results if c == 'ok')
                reset   = sum(1 for c, _ in results if c == 'reset')
                refused = sum(1 for c, _ in results if c == 'refused')
                error   = sum(1 for c, _ in results if c == 'error')
                details = Counter(d for c, d in results if c == 'reset')

                total_ok      += ok
                total_reset   += reset
                total_refused += refused
                total_error   += error
                reset_details += details

                tag = "PASS" if reset == 0 else "FAIL"
                extra = f"  {dict(details)}" if details else ""
                print(f"  cycle {cycle+1:2d}/{CYCLES}:  "
                      f"ok={ok:3d}  reset={reset}  refused={refused}  "
                      f"error={error}  [{tag}]{extra}")

                # Give the arbiter time to spawn a fresh worker.
                time.sleep(WORKER_READY_PAUSE)
                wait_for_server(port, timeout=5)

            # ---- summary ----
            print()
            print("-" * 70)
            total = total_ok + total_reset + total_refused + total_error
            print(f"  total requests : {total}")
            print(f"  ok             : {total_ok}")
            print(f"  reset (bug)    : {total_reset}")
            print(f"  refused (gap)  : {total_refused}")
            print(f"  other error    : {total_error}")
            if reset_details:
                print(f"  reset breakdown: {dict(reset_details)}")

            print()
            if total_reset == 0:
                print("  RESULT: PASS")
                print("  No connection resets detected during worker restarts.")
            else:
                print(f"  RESULT: FAIL  ({total_reset} connection reset(s))")
                print("  Connections were reset during worker shutdown.")

            print()
            return 0 if total_reset == 0 else 1

        finally:
            proc.send_signal(signal.SIGTERM)
            try:
                proc.wait(timeout=10)
            except subprocess.TimeoutExpired:
                proc.kill()
                proc.wait()


if __name__ == '__main__':
    sys.exit(main())

Test results

Unit tests — 84 passed (8 new + 76 existing), 0 failures.

Integration test (script above, 10 cycles x 30 concurrent requests per cycle):

Code state Resets / 300 Result
Before fix (master) 51 (17%) FAIL
After fix (this PR) 0 (0%) PASS
# ---- Before fix (master) ----
  cycle  1/10:  ok= 19  reset=11  refused=0  error=0  [FAIL]
  cycle  2/10:  ok= 30  reset=0   refused=0  error=0  [PASS]
  cycle  3/10:  ok= 28  reset=2   refused=0  error=0  [FAIL]
  cycle  4/10:  ok= 24  reset=6   refused=0  error=0  [FAIL]
  cycle  5/10:  ok= 29  reset=1   refused=0  error=0  [FAIL]
  cycle  6/10:  ok= 18  reset=12  refused=0  error=0  [FAIL]
  cycle  7/10:  ok= 26  reset=4   refused=0  error=0  [FAIL]
  cycle  8/10:  ok= 17  reset=13  refused=0  error=0  [FAIL]
  cycle  9/10:  ok= 28  reset=2   refused=0  error=0  [FAIL]
  cycle 10/10:  ok= 30  reset=0   refused=0  error=0  [PASS]
  RESULT: FAIL  (51 connection reset(s))

# ---- After fix (this PR) ----
  cycle  1/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  2/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  3/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  4/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  5/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  6/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  7/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  8/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle  9/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  cycle 10/10:  ok= 30  reset=0  refused=0  error=0  [PASS]
  RESULT: PASS (0 connection resets)

Notes

  • The sync worker is not affected — it has no poller/thread pool and leaves the shared listener open for other workers.
  • Without reuse_port, the arbiter holds the shared listener open across restarts, so backlogged connections survive and are picked up by the replacement worker. The bug only manifests with reuse_port=True.
  • The 10 ms pause between drain passes is intentionally conservative; on localhost TCP handshakes complete in < 1 ms, but cross-host setups may need the headroom.

When self.alive becomes False (via max_requests, post_request hook, or
SIGTERM), the main loop exits. With SO_REUSEPORT each worker owns its
listener socket — connections that completed the TCP handshake but were
not yet accept()-ed sit in the kernel backlog and receive a RST when
the socket is later closed.

Add a two-pass drain-then-close sequence per listener:

  1. Unregister listeners from the poller (set_accept_enabled(False))
  2. For each listener:
     a. First pass: accept() all pending connections
     b. Brief pause (10 ms) for in-flight TCP handshakes to land
     c. Second pass: accept() stragglers
     d. Close the listener socket immediately

Accepted connections are submitted to the thread pool and waited on
during the existing graceful-timeout drain loop. After the close, new
clients get a clean ECONNREFUSED instead of a RST.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant