Skip to content

Commit 9950f2d

Browse files
committed
Improve server and monitor robustness.
1 parent bc4e1ef commit 9950f2d

File tree

11 files changed

+275
-150
lines changed

11 files changed

+275
-150
lines changed

async-service-supervisor.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Gem::Specification.new do |spec|
2424

2525
spec.required_ruby_version = ">= 3.3"
2626

27+
spec.add_dependency "async", "~> 2.38"
2728
spec.add_dependency "async-bus"
2829
spec.add_dependency "async-service", "~> 0.15"
2930
spec.add_dependency "async-utilization", "~> 0.3"

lib/async/service/supervisor/loop.rb

Lines changed: 0 additions & 40 deletions
This file was deleted.

lib/async/service/supervisor/memory_monitor.rb

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@
66
require "memory/leak/cluster"
77
require "set"
88

9-
require_relative "loop"
9+
require_relative "monitor"
1010

1111
module Async
1212
module Service
1313
module Supervisor
1414
# Monitors worker memory usage and restarts workers that exceed limits.
1515
#
1616
# Uses the `memory` gem to track process memory and detect leaks.
17-
class MemoryMonitor
17+
class MemoryMonitor < Monitor
1818
# Create a new memory monitor.
1919
#
2020
# @parameter interval [Integer] The interval at which to check for memory leaks.
2121
# @parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit.
2222
# @parameter free_size_minimum [Integer] The minimum free memory threshold, or nil for no threshold.
2323
# @parameter options [Hash] Options to pass to the cluster when adding processes.
2424
def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)
25-
@interval = interval
25+
super(interval: interval)
2626
@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit, free_size_minimum: free_size_minimum)
2727

2828
# We use these options when adding processes to the cluster:
@@ -85,28 +85,11 @@ def remove(supervisor_controller)
8585
end
8686
end
8787

88-
# The key used when this monitor's status is aggregated with others.
89-
def self.monitor_type
90-
:memory_monitor
91-
end
92-
9388
# Serialize memory cluster data for JSON.
9489
def as_json
9590
@cluster.as_json
9691
end
9792

98-
# Serialize to JSON string.
99-
def to_json(...)
100-
as_json.to_json(...)
101-
end
102-
103-
# Get status for the memory monitor.
104-
#
105-
# @returns [Hash] Hash with type and data keys.
106-
def status
107-
{type: self.class.monitor_type, data: as_json}
108-
end
109-
11093
# Invoked when a memory leak is detected.
11194
#
11295
# @parameter process_id [Integer] The process ID of the process that has a memory leak.
@@ -128,21 +111,15 @@ def memory_leak_detected(process_id, monitor)
128111
true
129112
end
130113

131-
# Run the memory monitor.
132-
#
133-
# @returns [Async::Task] The task that is running the memory monitor.
134-
def run
135-
Async do
136-
Loop.run(interval: @interval) do
137-
@guard.synchronize do
138-
# This block must return true if the process was killed.
139-
@cluster.check! do |process_id, monitor|
140-
begin
141-
memory_leak_detected(process_id, monitor)
142-
rescue => error
143-
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
144-
end
145-
end
114+
# Run one iteration of the memory monitor.
115+
def run_once
116+
@guard.synchronize do
117+
# This block must return true if the process was killed.
118+
@cluster.check! do |process_id, monitor|
119+
begin
120+
memory_leak_detected(process_id, monitor)
121+
rescue => error
122+
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
146123
end
147124
end
148125
end
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Samuel Williams.
5+
6+
require "async/loop"
7+
8+
module Async
9+
module Service
10+
module Supervisor
11+
class Monitor
12+
def initialize(interval: 1.0)
13+
@interval = interval
14+
end
15+
16+
def as_json(...)
17+
{}
18+
end
19+
20+
# Serialize to JSON string.
21+
def to_json(...)
22+
as_json.to_json(...)
23+
end
24+
25+
# Get aggregated utilization status by service name.
26+
#
27+
# Reads utilization data from all registered workers and aggregates it
28+
# by service name (from supervisor_controller.state[:name]).
29+
#
30+
# @returns [Hash] Hash with type and data keys.
31+
def status
32+
{type: self.class.name, data: as_json}
33+
end
34+
35+
# Run one iteration of the monitor.
36+
def run_once
37+
# This method can be overridden by subclasses to implement specific monitoring logic.
38+
end
39+
40+
# Run the utilization monitor.
41+
#
42+
# Periodically aggregates utilization data from all workers.
43+
#
44+
# @returns [Async::Task] The task that is running the utilization monitor.
45+
def run(parent: Async::Task.current)
46+
parent.async do
47+
Loop.periodic(interval: @interval) do
48+
self.run_once
49+
end
50+
end
51+
end
52+
end
53+
end
54+
end
55+
end

lib/async/service/supervisor/process_monitor.rb

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# Copyright, 2025-2026, by Samuel Williams.
55

66
require "process/metrics"
7-
require_relative "loop"
7+
require_relative "monitor"
88

99
module Async
1010
module Service
@@ -15,13 +15,13 @@ module Supervisor
1515
# Unlike {MemoryMonitor}, this monitor captures metrics for the entire process tree
1616
# by tracking the parent process ID (ppid), which is more efficient than tracking
1717
# individual processes.
18-
class ProcessMonitor
18+
class ProcessMonitor < Monitor
1919
# Create a new process monitor.
2020
#
2121
# @parameter interval [Integer] The interval in seconds at which to log process metrics.
2222
# @parameter ppid [Integer] The parent process ID to monitor. If nil, uses the current process to capture its children.
2323
def initialize(interval: 60, ppid: nil)
24-
@interval = interval
24+
super(interval: interval)
2525
@ppid = ppid || Process.ppid
2626
end
2727

@@ -67,33 +67,13 @@ def as_json
6767
{ppid: @ppid, metrics: self.metrics}
6868
end
6969

70-
# Serialize to JSON string.
71-
def to_json(...)
72-
as_json.to_json(...)
73-
end
74-
75-
# Get status for the process monitor.
76-
#
77-
# @returns [Hash] Hash with type and data keys.
78-
def status
79-
{type: self.class.monitor_type, data: as_json}
80-
end
81-
82-
# Run the process monitor.
83-
#
84-
# Periodically captures and logs process metrics for the entire process tree.
85-
#
86-
# @returns [Async::Task] The task that is running the process monitor.
87-
def run
88-
Async do
89-
Loop.run(interval: @interval) do
90-
metrics = self.metrics
91-
92-
# Log each process individually for better searchability in log platforms:
93-
metrics.each do |process_id, general|
94-
Console.info(self, "Process metrics captured.", general: general)
95-
end
96-
end
70+
# Run one iteration of the process monitor.
71+
def run_once
72+
metrics = self.metrics
73+
74+
# Log each process individually for better searchability in log platforms:
75+
metrics.each do |process_id, general|
76+
Console.info(self, "Process metrics captured.", general: general)
9777
end
9878
end
9979
end

lib/async/service/supervisor/server.rb

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,30 +87,34 @@ def remove(controller)
8787
# @parameter parent [Async::Task] The parent task to run under.
8888
def run
8989
Sync do |task|
90+
barrier = Async::Barrier.new
91+
9092
# Start all monitors:
9193
@monitors.each do |monitor|
92-
monitor.run
94+
monitor.run(parent: barrier)
9395
rescue => error
9496
Console.error(self, "Error while starting monitor!", monitor: monitor, exception: error)
9597
end
9698

97-
# Accept connections from workers:
98-
self.accept do |connection|
99-
# Create a supervisor controller for this connection:
100-
supervisor_controller = SupervisorController.new(self, connection)
101-
102-
# Bind supervisor controller:
103-
connection.bind(:supervisor, supervisor_controller)
104-
105-
# Run the connection:
106-
connection.run
107-
ensure
108-
self.remove(supervisor_controller)
99+
barrier.async do
100+
# Accept connections from workers:
101+
self.accept do |connection|
102+
# Create a supervisor controller for this connection:
103+
supervisor_controller = SupervisorController.new(self, connection)
104+
105+
# Bind supervisor controller:
106+
connection.bind(:supervisor, supervisor_controller)
107+
108+
# Run the connection:
109+
connection.run
110+
ensure
111+
self.remove(supervisor_controller)
112+
end
109113
end
110114

111-
task.children&.each(&:wait)
115+
barrier.wait
112116
ensure
113-
task.stop
117+
barrier&.stop
114118
end
115119
end
116120
end

lib/async/service/supervisor/utilization_monitor.rb

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
require "set"
77

8-
require_relative "loop"
8+
require_relative "monitor"
99
require "async/utilization"
1010

1111
module Async
@@ -15,7 +15,7 @@ module Supervisor
1515
#
1616
# Uses shared memory to efficiently collect utilization metrics from workers
1717
# and aggregates them by service name for monitoring and reporting.
18-
class UtilizationMonitor
18+
class UtilizationMonitor < Monitor
1919
# Allocates and manages shared memory segments for worker utilization data.
2020
#
2121
# Manages a shared memory file that workers can write utilization metrics to.
@@ -195,8 +195,8 @@ def close
195195
# @parameter size [Integer] Total size of the shared memory buffer.
196196
# @parameter segment_size [Integer] Size of each allocation segment (default: 512 bytes).
197197
def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)
198+
super(interval: interval)
198199
@path = path
199-
@interval = interval
200200
@segment_size = segment_size
201201

202202
@allocator = SegmentAllocator.new(path, size: size, segment_size: segment_size)
@@ -313,39 +313,16 @@ def as_json
313313
end
314314
end
315315

316-
# Serialize to JSON string.
317-
def to_json(...)
318-
as_json.to_json(...)
319-
end
320-
321-
# Get aggregated utilization status by service name.
322-
#
323-
# Reads utilization data from all registered workers and aggregates it
324-
# by service name (from supervisor_controller.state[:name]).
325-
#
326-
# @returns [Hash] Hash with type and data keys.
327-
def status
328-
{type: self.class.monitor_type, data: as_json}
329-
end
330-
331316
# Emit the utilization metrics.
332317
#
333318
# @parameter status [Hash] The utilization metrics.
334319
def emit(metrics)
335320
Console.info(self, "Utilization:", metrics: metrics)
336321
end
337322

338-
# Run the utilization monitor.
339-
#
340-
# Periodically aggregates utilization data from all workers.
341-
#
342-
# @returns [Async::Task] The task that is running the utilization monitor.
343-
def run
344-
Async do
345-
Loop.run(interval: @interval) do
346-
self.emit(self.as_json)
347-
end
348-
end
323+
# Run one iteration of the utilization monitor.
324+
def run_once
325+
self.emit(self.as_json)
349326
end
350327
end
351328
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Improve robustness and error handling of default monitors and server loop, ensuring that monitor failures either completely crash the server or retry appropriately, rather than leaving the server in a broken state.
6+
37
## v0.14.0
48

59
- Add `Worker#make_controller` as an override point for providing a custom worker controller with additional RPCs.

0 commit comments

Comments
 (0)