Skip to content

Commit 834ff76

Browse files
committed
Improve server and monitor robustness.
1 parent bc4e1ef commit 834ff76

File tree

12 files changed

+461
-150
lines changed

12 files changed

+461
-150
lines changed

async-service-supervisor.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Gem::Specification.new do |spec|
2424

2525
spec.required_ruby_version = ">= 3.3"
2626

27+
spec.add_dependency "async", "~> 2.38"
2728
spec.add_dependency "async-bus"
2829
spec.add_dependency "async-service", "~> 0.15"
2930
spec.add_dependency "async-utilization", "~> 0.3"

examples/resize_mmap/test.rb

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
# High-frequency regression test: does supervisor-side ftruncate + mmap remap
5+
# invalidate worker-side mmap slices (as created by Observer.open)?
6+
#
7+
# The supervisor (parent process) holds @file open and calls:
8+
# @file.truncate(new_size)
9+
# @buffer.free
10+
# @buffer = IO::Buffer.map(@file, new_size)
11+
#
12+
# The worker (child process) independently opens the same file, maps a
13+
# page-aligned window covering its segment, closes the fd, and holds a slice
14+
# of the resulting buffer — exactly what Observer.open does.
15+
#
16+
# This test runs both sides concurrently at maximum speed to expose any
17+
# platform/version scenario where ftruncate or munmap (via #free) on one
18+
# process's mapping invalidates another process's independent mapping.
19+
#
20+
# Usage:
21+
# ruby examples/resize_mmap/test.rb
22+
23+
require "tmpdir"
24+
25+
PAGE_SIZE = IO::Buffer::PAGE_SIZE
26+
SEGMENT_SIZE = 512
27+
ITERATIONS = 200_000 # write iterations per worker
28+
RESIZES = 20 # supervisor-side resizes per test run
29+
30+
# Reproduce the exact mapping logic from Observer.open.
31+
# Returns [slice, parent_buffer] — caller must hold parent_buffer to prevent GC
32+
# (IO::Buffer#slice keeps a back-reference to the parent in Ruby's C layer, but
33+
# we return it explicitly to make the lifetime contract obvious in tests).
34+
def worker_map(path, segment_offset, segment_size)
35+
page_size = IO::Buffer::PAGE_SIZE
36+
aligned_offset = (segment_offset / page_size) * page_size
37+
offset_adjustment = segment_offset - aligned_offset
38+
aligned_end = (((segment_offset + segment_size) + page_size - 1) / page_size) * page_size
39+
map_size = [aligned_end - aligned_offset, page_size].max
40+
41+
file = File.open(path, "r+b")
42+
# Mirror Observer.open: fd is closed after mapping; mapping persists independently.
43+
parent = IO::Buffer.map(file, map_size, aligned_offset)
44+
file.close
45+
46+
slice = (offset_adjustment > 0 || map_size > segment_size) ?
47+
parent.slice(offset_adjustment, segment_size) :
48+
parent
49+
50+
[slice, parent]
51+
end
52+
53+
# Run one test scenario.
54+
#
55+
# Layout within a segment (as seen by both sides):
56+
# [0..7] u64 iteration counter — final value must equal ITERATIONS
57+
# [8..15] u64 write-error count — must be 0
58+
#
59+
# Synchronisation:
60+
# mapped_w/r child → parent: "I have mapped the file, you may start resizing"
61+
# done_w/r child → parent: "I have finished writing all ITERATIONS values"
62+
# exit_w/r parent → child: "you may exit now"
63+
def run_test(path, segment_offset, label)
64+
File.open(path, "w+b") { |f| f.truncate(PAGE_SIZE) }
65+
66+
mapped_r, mapped_w = IO.pipe # child signals: file mapped
67+
done_r, done_w = IO.pipe # child signals: writes complete
68+
exit_r, exit_w = IO.pipe # parent signals: child may exit
69+
70+
child_pid = fork do
71+
[mapped_r, done_r, exit_w].each(&:close)
72+
73+
slice, _parent = worker_map(path, segment_offset, SEGMENT_SIZE)
74+
75+
mapped_w.write("1")
76+
mapped_w.close # unblock parent resize loop
77+
78+
errors = 0
79+
ITERATIONS.times do |i|
80+
begin
81+
slice.set_value(:u64, 0, i + 1)
82+
rescue => e
83+
errors += 1
84+
end
85+
end
86+
slice.set_value(:u64, 8, errors)
87+
88+
done_w.write("1")
89+
done_w.close # unblock parent result read
90+
91+
exit_r.read
92+
exit_r.close
93+
exit 0
94+
end
95+
96+
[mapped_w, done_w, exit_r].each(&:close)
97+
98+
# Wait for the worker to have mapped the file before resizing.
99+
mapped_r.read
100+
mapped_r.close
101+
102+
# Supervisor side: resize at maximum speed, concurrently with the child writes.
103+
# Use a thread so the resize loop and the done_r.read can overlap in time.
104+
sup_file = File.open(path, "r+b")
105+
sup_buffer = IO::Buffer.map(sup_file, PAGE_SIZE)
106+
curr_size = PAGE_SIZE
107+
108+
resize_thread = Thread.new do
109+
RESIZES.times do
110+
curr_size *= 2
111+
sup_file.truncate(curr_size)
112+
sup_buffer.free
113+
sup_buffer = IO::Buffer.map(sup_file, curr_size)
114+
end
115+
end
116+
117+
# Block until the child has finished all writes.
118+
done_r.read
119+
done_r.close
120+
121+
# Drain the resize thread before reading results.
122+
resize_thread.join
123+
124+
final_counter = sup_buffer.get_value(:u64, segment_offset + 0)
125+
write_errors = sup_buffer.get_value(:u64, segment_offset + 8)
126+
127+
sup_buffer.free
128+
sup_file.close
129+
130+
exit_w.write("1")
131+
exit_w.close
132+
133+
Process.waitpid(child_pid)
134+
File.unlink(path) rescue nil
135+
136+
[final_counter, write_errors]
137+
end
138+
139+
# ── Main ──────────────────────────────────────────────────────────────────────
140+
141+
puts "Platform: #{RUBY_PLATFORM}"
142+
puts "Ruby: #{RUBY_VERSION}"
143+
puts "PAGE_SIZE: #{PAGE_SIZE}"
144+
puts "SEGMENT_SIZE: #{SEGMENT_SIZE}"
145+
puts "ITERATIONS: #{ITERATIONS}"
146+
puts "RESIZES: #{RESIZES} (#{PAGE_SIZE}#{PAGE_SIZE * 2**RESIZES} bytes)"
147+
puts
148+
149+
failures = 0
150+
151+
Dir.mktmpdir do |dir|
152+
path = File.join(dir, "test.shm")
153+
154+
tests = {
155+
"page-aligned offset (segment_offset=0, exercises map_size > segment_size slice path)" => 0,
156+
"non-page-aligned offset (segment_offset=#{SEGMENT_SIZE}, exercises offset_adjustment > 0 slice path)" => SEGMENT_SIZE,
157+
}
158+
159+
tests.each_with_index do |(label, segment_offset), index|
160+
print "Test #{index + 1}: #{label}\n ... "
161+
$stdout.flush
162+
163+
final, errors = run_test(path, segment_offset, label)
164+
165+
if errors > 0 || final != ITERATIONS
166+
puts "FAIL"
167+
puts " expected counter = #{ITERATIONS}, got #{final}"
168+
puts " write errors (IO::Buffer exceptions) = #{errors}"
169+
if final != ITERATIONS
170+
puts " => writes went to stale/invalid pages and were not visible to the supervisor"
171+
end
172+
failures += 1
173+
else
174+
puts "PASS (counter=#{ITERATIONS}, errors=0)"
175+
end
176+
end
177+
end
178+
179+
puts
180+
if failures == 0
181+
puts "All tests passed — ftruncate + IO::Buffer.map remap does NOT invalidate existing worker mmap slices on this platform."
182+
exit 0
183+
else
184+
puts "#{failures} test(s) FAILED — existing mmap slices were invalidated by ftruncate/remap on this platform!"
185+
exit 1
186+
end

lib/async/service/supervisor/loop.rb

Lines changed: 0 additions & 40 deletions
This file was deleted.

lib/async/service/supervisor/memory_monitor.rb

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@
66
require "memory/leak/cluster"
77
require "set"
88

9-
require_relative "loop"
9+
require_relative "monitor"
1010

1111
module Async
1212
module Service
1313
module Supervisor
1414
# Monitors worker memory usage and restarts workers that exceed limits.
1515
#
1616
# Uses the `memory` gem to track process memory and detect leaks.
17-
class MemoryMonitor
17+
class MemoryMonitor < Monitor
1818
# Create a new memory monitor.
1919
#
2020
# @parameter interval [Integer] The interval at which to check for memory leaks.
2121
# @parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit.
2222
# @parameter free_size_minimum [Integer] The minimum free memory threshold, or nil for no threshold.
2323
# @parameter options [Hash] Options to pass to the cluster when adding processes.
2424
def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)
25-
@interval = interval
25+
super(interval: interval)
2626
@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit, free_size_minimum: free_size_minimum)
2727

2828
# We use these options when adding processes to the cluster:
@@ -85,28 +85,11 @@ def remove(supervisor_controller)
8585
end
8686
end
8787

88-
# The key used when this monitor's status is aggregated with others.
89-
def self.monitor_type
90-
:memory_monitor
91-
end
92-
9388
# Serialize memory cluster data for JSON.
9489
def as_json
9590
@cluster.as_json
9691
end
9792

98-
# Serialize to JSON string.
99-
def to_json(...)
100-
as_json.to_json(...)
101-
end
102-
103-
# Get status for the memory monitor.
104-
#
105-
# @returns [Hash] Hash with type and data keys.
106-
def status
107-
{type: self.class.monitor_type, data: as_json}
108-
end
109-
11093
# Invoked when a memory leak is detected.
11194
#
11295
# @parameter process_id [Integer] The process ID of the process that has a memory leak.
@@ -128,21 +111,15 @@ def memory_leak_detected(process_id, monitor)
128111
true
129112
end
130113

131-
# Run the memory monitor.
132-
#
133-
# @returns [Async::Task] The task that is running the memory monitor.
134-
def run
135-
Async do
136-
Loop.run(interval: @interval) do
137-
@guard.synchronize do
138-
# This block must return true if the process was killed.
139-
@cluster.check! do |process_id, monitor|
140-
begin
141-
memory_leak_detected(process_id, monitor)
142-
rescue => error
143-
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
144-
end
145-
end
114+
# Run one iteration of the memory monitor.
115+
def run_once
116+
@guard.synchronize do
117+
# This block must return true if the process was killed.
118+
@cluster.check! do |process_id, monitor|
119+
begin
120+
memory_leak_detected(process_id, monitor)
121+
rescue => error
122+
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
146123
end
147124
end
148125
end
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Samuel Williams.
5+
6+
require "async/loop"
7+
8+
module Async
9+
module Service
10+
module Supervisor
11+
class Monitor
12+
def initialize(interval: 1.0)
13+
@interval = interval
14+
end
15+
16+
def as_json(...)
17+
{}
18+
end
19+
20+
# Serialize to JSON string.
21+
def to_json(...)
22+
as_json.to_json(...)
23+
end
24+
25+
# Get aggregated utilization status by service name.
26+
#
27+
# Reads utilization data from all registered workers and aggregates it
28+
# by service name (from supervisor_controller.state[:name]).
29+
#
30+
# @returns [Hash] Hash with type and data keys.
31+
def status
32+
{type: self.class.name, data: as_json}
33+
end
34+
35+
# Run one iteration of the monitor.
36+
def run_once
37+
# This method can be overridden by subclasses to implement specific monitoring logic.
38+
end
39+
40+
# Run the utilization monitor.
41+
#
42+
# Periodically aggregates utilization data from all workers.
43+
#
44+
# @returns [Async::Task] The task that is running the utilization monitor.
45+
def run(parent: Async::Task.current)
46+
parent.async do
47+
Loop.periodic(interval: @interval) do
48+
self.run_once
49+
end
50+
end
51+
end
52+
end
53+
end
54+
end
55+
end

0 commit comments

Comments
 (0)