Skip to content

Commit e05e40d

Browse files
committed
feat(ctl): add message-based dirty worker management
Replace signal-based dirty add/remove with protocol messages: - Add MSG_TYPE_MANAGE to dirty protocol for worker management - Add MANAGE_OP_ADD and MANAGE_OP_REMOVE operation codes - Add handle_manage_request() in DirtyArbiter - Update handlers to send messages instead of SIGTTIN/SIGTTOU signals New workers only load apps that haven't reached their worker limits. When all apps are at their limits, returns reason in response. Only increment num_workers when a worker is actually spawned.
1 parent 7df2609 commit e05e40d

4 files changed

Lines changed: 288 additions & 21 deletions

File tree

gunicorn/ctl/handlers.py

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import os
1212
import signal
13+
import socket
1314
import time
1415

1516

@@ -309,6 +310,8 @@ def dirty_add(self, count: int = 1) -> dict:
309310
"""
310311
Spawn additional dirty workers.
311312
313+
Sends a MANAGE message to the dirty arbiter to spawn workers.
314+
312315
Args:
313316
count: Number of dirty workers to add (default 1)
314317
@@ -321,25 +324,15 @@ def dirty_add(self, count: int = 1) -> dict:
321324
"error": "Dirty arbiter not running",
322325
}
323326

324-
# Send TTIN signals to dirty arbiter
325327
count = max(1, int(count))
326-
try:
327-
for _ in range(count):
328-
os.kill(self.arbiter.dirty_arbiter_pid, signal.SIGTTIN)
329-
return {
330-
"success": True,
331-
"added": count,
332-
}
333-
except OSError as e:
334-
return {
335-
"success": False,
336-
"error": str(e),
337-
}
328+
return self._send_manage_message("add", count)
338329

339330
def dirty_remove(self, count: int = 1) -> dict:
340331
"""
341332
Remove dirty workers.
342333
334+
Sends a MANAGE message to the dirty arbiter to remove workers.
335+
343336
Args:
344337
count: Number of dirty workers to remove (default 1)
345338
@@ -352,16 +345,73 @@ def dirty_remove(self, count: int = 1) -> dict:
352345
"error": "Dirty arbiter not running",
353346
}
354347

355-
# Send TTOU signals to dirty arbiter
356348
count = max(1, int(count))
357-
try:
358-
for _ in range(count):
359-
os.kill(self.arbiter.dirty_arbiter_pid, signal.SIGTTOU)
349+
return self._send_manage_message("remove", count)
350+
351+
def _send_manage_message(self, operation: str, count: int) -> dict:
352+
"""
353+
Send a worker management message to the dirty arbiter.
354+
355+
Args:
356+
operation: "add" or "remove"
357+
count: Number of workers to add/remove
358+
359+
Returns:
360+
Dictionary with result or error
361+
"""
362+
# Get socket path from arbiter object or environment
363+
dirty_socket_path = None
364+
if hasattr(self.arbiter, 'dirty_arbiter') and self.arbiter.dirty_arbiter:
365+
dirty_socket_path = getattr(
366+
self.arbiter.dirty_arbiter, 'socket_path', None
367+
)
368+
if not dirty_socket_path:
369+
dirty_socket_path = os.environ.get('GUNICORN_DIRTY_SOCKET')
370+
if not dirty_socket_path:
360371
return {
361-
"success": True,
362-
"removed": count,
372+
"success": False,
373+
"error": "Cannot find dirty arbiter socket path",
363374
}
364-
except OSError as e:
375+
376+
try:
377+
from gunicorn.dirty.protocol import (
378+
DirtyProtocol, MANAGE_OP_ADD, MANAGE_OP_REMOVE
379+
)
380+
381+
op = MANAGE_OP_ADD if operation == "add" else MANAGE_OP_REMOVE
382+
383+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
384+
sock.settimeout(10.0)
385+
sock.connect(dirty_socket_path)
386+
387+
# Send manage request
388+
request = {
389+
"type": DirtyProtocol.MSG_TYPE_MANAGE,
390+
"id": 1,
391+
"op": op,
392+
"count": count,
393+
}
394+
DirtyProtocol.write_message(sock, request)
395+
396+
# Read response
397+
response = DirtyProtocol.read_message(sock)
398+
sock.close()
399+
400+
if response.get("type") == DirtyProtocol.MSG_TYPE_RESPONSE:
401+
return response.get("result", {"success": True})
402+
elif response.get("type") == DirtyProtocol.MSG_TYPE_ERROR:
403+
error = response.get("error", {})
404+
return {
405+
"success": False,
406+
"error": error.get("message", str(error)),
407+
}
408+
else:
409+
return {
410+
"success": False,
411+
"error": f"Unexpected response type: {response.get('type')}",
412+
}
413+
414+
except Exception as e:
365415
return {
366416
"success": False,
367417
"error": str(e),

gunicorn/dirty/arbiter.py

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
STASH_OP_DELETE_TABLE,
4242
STASH_OP_TABLES,
4343
STASH_OP_EXISTS,
44+
MANAGE_OP_ADD,
45+
MANAGE_OP_REMOVE,
4446
)
4547
from .worker import DirtyWorker
4648

@@ -426,6 +428,9 @@ async def handle_client(self, reader, writer):
426428
# Handle status queries
427429
elif msg_type == DirtyProtocol.MSG_TYPE_STATUS:
428430
await self.handle_status_request(message, writer)
431+
# Handle worker management (add/remove workers)
432+
elif msg_type == DirtyProtocol.MSG_TYPE_MANAGE:
433+
await self.handle_manage_request(message, writer)
429434
else:
430435
# Route request to a dirty worker - pass writer for streaming
431436
await self.route_request(message, writer)
@@ -690,6 +695,100 @@ async def handle_status_request(self, message, client_writer):
690695
response = make_response(request_id, result)
691696
await DirtyProtocol.write_message_async(client_writer, response)
692697

698+
async def handle_manage_request(self, message, client_writer):
699+
"""
700+
Handle a worker management request.
701+
702+
Supports adding or removing dirty workers via protocol messages.
703+
704+
Args:
705+
message: Manage request message
706+
client_writer: StreamWriter to send response to client
707+
"""
708+
request_id = message.get("id", "unknown")
709+
op = message.get("op")
710+
count = max(1, int(message.get("count", 1)))
711+
712+
try:
713+
if op == MANAGE_OP_ADD:
714+
# Add workers - only loads apps that need more workers
715+
spawned = 0
716+
for _ in range(count):
717+
result = self.spawn_worker()
718+
if result is not None:
719+
self.num_workers += 1
720+
spawned += 1
721+
await asyncio.sleep(0.1)
722+
723+
# Provide feedback about why no workers were spawned
724+
if spawned == 0:
725+
result = {
726+
"success": True,
727+
"operation": "add",
728+
"requested": count,
729+
"spawned": 0,
730+
"reason": "All apps have reached their worker limits",
731+
"total_workers": len(self.workers),
732+
"target_workers": self.num_workers,
733+
}
734+
else:
735+
result = {
736+
"success": True,
737+
"operation": "add",
738+
"requested": count,
739+
"spawned": spawned,
740+
"total_workers": len(self.workers),
741+
"target_workers": self.num_workers,
742+
}
743+
744+
elif op == MANAGE_OP_REMOVE:
745+
# Remove workers (similar to TTOU signal but via message)
746+
min_workers = self._get_minimum_workers()
747+
removed = 0
748+
749+
for _ in range(count):
750+
if self.num_workers <= min_workers:
751+
break
752+
if len(self.workers) <= 1:
753+
break
754+
755+
self.num_workers -= 1
756+
757+
# Kill oldest worker
758+
oldest_pid = min(self.workers.keys(),
759+
key=lambda p: self.workers[p].age)
760+
self.kill_worker(oldest_pid, signal.SIGTERM)
761+
removed += 1
762+
await asyncio.sleep(0.1)
763+
764+
result = {
765+
"success": True,
766+
"operation": "remove",
767+
"requested": count,
768+
"removed": removed,
769+
"total_workers": len(self.workers),
770+
"target_workers": self.num_workers,
771+
}
772+
773+
else:
774+
error = DirtyError(f"Unknown manage operation: {op}")
775+
response = make_error_response(request_id, error)
776+
await DirtyProtocol.write_message_async(client_writer, response)
777+
return
778+
779+
self.log.info("Worker management: %s %d workers (spawned/removed: %d)",
780+
"add" if op == MANAGE_OP_ADD else "remove",
781+
count,
782+
result.get("spawned", result.get("removed", 0)))
783+
784+
response = make_response(request_id, result)
785+
await DirtyProtocol.write_message_async(client_writer, response)
786+
787+
except Exception as e:
788+
self.log.error("Manage operation error: %s", e)
789+
response = make_error_response(request_id, DirtyError(str(e)))
790+
await DirtyProtocol.write_message_async(client_writer, response)
791+
693792
async def handle_stash_request(self, message, client_writer):
694793
"""
695794
Handle a stash operation directly in the arbiter.
@@ -830,20 +929,27 @@ async def manage_workers(self):
830929
self.kill_worker(oldest_pid, signal.SIGTERM)
831930
await asyncio.sleep(0.1)
832931

833-
def spawn_worker(self):
932+
def spawn_worker(self, force_all_apps=False):
834933
"""
835934
Spawn a new dirty worker.
836935
837936
Worker app assignment follows these priorities:
838937
1. If there are pending respawns (from dead workers), use those apps
839938
2. Otherwise, determine apps for a new worker based on allocation
939+
3. If force_all_apps=True, spawn with all apps regardless of limits
940+
941+
Args:
942+
force_all_apps: If True, spawn worker with all apps ignoring limits
840943
841944
Returns:
842945
Worker PID in parent process, or None if no apps need workers
843946
"""
844947
# Priority 1: Respawn dead worker with same apps
845948
if self._pending_respawns:
846949
app_paths = self._pending_respawns.pop(0)
950+
elif force_all_apps:
951+
# Force spawn with all apps (used by TTIN signal)
952+
app_paths = list(self.app_specs.keys())
847953
else:
848954
# Priority 2: New worker for initial pool
849955
app_paths = self._get_apps_for_new_worker()

gunicorn/dirty/protocol.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
MSG_TYPE_END = 0x05
4545
MSG_TYPE_STASH = 0x10 # Stash operations (shared state between workers)
4646
MSG_TYPE_STATUS = 0x11 # Status query for arbiter/workers
47+
MSG_TYPE_MANAGE = 0x12 # Worker management (add/remove workers)
4748

4849
# Message type names (for backwards compatibility with old API)
4950
MSG_TYPE_REQUEST_STR = "request"
@@ -53,6 +54,7 @@
5354
MSG_TYPE_END_STR = "end"
5455
MSG_TYPE_STASH_STR = "stash"
5556
MSG_TYPE_STATUS_STR = "status"
57+
MSG_TYPE_MANAGE_STR = "manage"
5658

5759
# Map int types to string names
5860
MSG_TYPE_TO_STR = {
@@ -63,6 +65,7 @@
6365
MSG_TYPE_END: MSG_TYPE_END_STR,
6466
MSG_TYPE_STASH: MSG_TYPE_STASH_STR,
6567
MSG_TYPE_STATUS: MSG_TYPE_STATUS_STR,
68+
MSG_TYPE_MANAGE: MSG_TYPE_MANAGE_STR,
6669
}
6770

6871
# Map string names to int types
@@ -80,6 +83,10 @@
8083
STASH_OP_TABLES = 9
8184
STASH_OP_EXISTS = 10
8285

86+
# Manage operation codes
87+
MANAGE_OP_ADD = 1 # Add/spawn workers
88+
MANAGE_OP_REMOVE = 2 # Remove/kill workers
89+
8390
# Header format: Magic (2) + Version (1) + Type (1) + Length (4) + RequestID (8) = 16
8491
HEADER_FORMAT = ">2sBBIQ"
8592
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
@@ -102,6 +109,7 @@ class BinaryProtocol:
102109
MSG_TYPE_END = MSG_TYPE_END_STR
103110
MSG_TYPE_STASH = MSG_TYPE_STASH_STR
104111
MSG_TYPE_STATUS = MSG_TYPE_STATUS_STR
112+
MSG_TYPE_MANAGE = MSG_TYPE_MANAGE_STR
105113

106114
@staticmethod
107115
def encode_header(msg_type: int, request_id: int, payload_length: int) -> bytes:
@@ -292,6 +300,28 @@ def encode_status(request_id: int) -> bytes:
292300
header = BinaryProtocol.encode_header(MSG_TYPE_STATUS, request_id, 0)
293301
return header
294302

303+
@staticmethod
304+
def encode_manage(request_id: int, op: int, count: int = 1) -> bytes:
305+
"""
306+
Encode a worker management message.
307+
308+
Args:
309+
request_id: Request identifier
310+
op: Management operation (MANAGE_OP_ADD or MANAGE_OP_REMOVE)
311+
count: Number of workers to add/remove
312+
313+
Returns:
314+
bytes: Complete message (header + payload)
315+
"""
316+
payload_dict = {
317+
"op": op,
318+
"count": count,
319+
}
320+
payload = TLVEncoder.encode(payload_dict)
321+
header = BinaryProtocol.encode_header(MSG_TYPE_MANAGE, request_id,
322+
len(payload))
323+
return header + payload
324+
295325
@staticmethod
296326
def encode_stash(request_id: int, op: int, table: str,
297327
key=None, value=None, pattern=None) -> bytes:
@@ -603,6 +633,12 @@ def _encode_from_dict(message: dict) -> bytes:
603633
)
604634
elif msg_type == MSG_TYPE_STATUS:
605635
return BinaryProtocol.encode_status(request_id)
636+
elif msg_type == MSG_TYPE_MANAGE:
637+
return BinaryProtocol.encode_manage(
638+
request_id,
639+
message.get("op"),
640+
message.get("count", 1)
641+
)
606642
else:
607643
raise DirtyProtocolError(f"Unhandled message type: {msg_type}")
608644

@@ -752,3 +788,23 @@ def make_stash_message(request_id, op: int, table: str,
752788
if pattern is not None:
753789
msg["pattern"] = pattern
754790
return msg
791+
792+
793+
def make_manage_message(request_id, op: int, count: int = 1) -> dict:
794+
"""
795+
Build a worker management message dict.
796+
797+
Args:
798+
request_id: Unique request identifier (int or str)
799+
op: Management operation (MANAGE_OP_ADD or MANAGE_OP_REMOVE)
800+
count: Number of workers to add/remove
801+
802+
Returns:
803+
dict: Manage message dict
804+
"""
805+
return {
806+
"type": DirtyProtocol.MSG_TYPE_MANAGE,
807+
"id": request_id,
808+
"op": op,
809+
"count": count,
810+
}

0 commit comments

Comments
 (0)