Skip to content

Commit 91df406

Browse files
Introduce UtilizationMonitor.
1 parent 7f03407 commit 91df406

File tree

12 files changed

+972
-6
lines changed

12 files changed

+972
-6
lines changed

async-service-supervisor.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Gem::Specification.new do |spec|
2626

2727
spec.add_dependency "async-bus"
2828
spec.add_dependency "async-service", "~> 0.15"
29+
spec.add_dependency "async-utilization", "~> 0.3"
2930
spec.add_dependency "io-endpoint"
3031
spec.add_dependency "memory", "~> 0.7"
3132
spec.add_dependency "memory-leak", "~> 0.10"

examples/echo/service.rb

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#!/usr/bin/env async-service
2+
# frozen_string_literal: true
3+
4+
# Released under the MIT License.
5+
# Copyright, 2025, by Samuel Williams.
6+
7+
require "async/service/supervisor"
8+
require "async/service/managed/service"
9+
require "async/utilization"
10+
require "io/endpoint/host_endpoint"
11+
12+
class EchoService < Async::Service::Managed::Service
13+
def initialize(...)
14+
super
15+
16+
@bound_endpoint = nil
17+
@endpoint = nil
18+
end
19+
20+
# Prepare the bound endpoint for the server.
21+
def start
22+
@endpoint = IO::Endpoint.tcp("127.0.0.1", 8080)
23+
24+
Sync do
25+
@bound_endpoint = @endpoint.bound
26+
end
27+
28+
Console.info(self, "Starting echo server on #{@endpoint}")
29+
30+
super
31+
end
32+
33+
def run(instance, evaluator)
34+
evaluator.prepare!(instance)
35+
36+
instance.ready!
37+
38+
registry = evaluator.utilization_registry
39+
connections_total = registry.metric(:connections_total)
40+
connections_active = registry.metric(:connections_active)
41+
messages_total = registry.metric(:messages_total)
42+
43+
Async do |task|
44+
@bound_endpoint.accept do |peer|
45+
connections_total.increment
46+
connections_active.track do
47+
Console.info(self, "Client connected", peer: peer)
48+
49+
peer.each_line do |line|
50+
messages_total.increment
51+
peer.write(line)
52+
end
53+
54+
Console.info(self, "Client disconnected", peer: peer)
55+
end
56+
end
57+
end
58+
59+
# Return the bound endpoint for health checking
60+
@bound_endpoint
61+
end
62+
63+
# Close the bound endpoint.
64+
def stop(...)
65+
if @bound_endpoint
66+
@bound_endpoint.close
67+
@bound_endpoint = nil
68+
end
69+
70+
@endpoint = nil
71+
72+
super
73+
end
74+
end
75+
76+
service "echo" do
77+
include Async::Service::Managed::Environment
78+
include Async::Service::Supervisor::Supervised
79+
80+
service_class EchoService
81+
82+
utilization_schema do
83+
{
84+
connections_total: :u64,
85+
connections_active: :u32,
86+
messages_total: :u64
87+
}
88+
end
89+
end
90+
91+
service "supervisor" do
92+
include Async::Service::Supervisor::Environment
93+
94+
monitors do
95+
[
96+
Async::Service::Supervisor::UtilizationMonitor.new(
97+
path: "utilization.shm",
98+
interval: 1
99+
)
100+
]
101+
end
102+
end

examples/echo/utilization.shm

128 KB
Binary file not shown.

lib/async/service/supervisor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
require_relative "supervisor/memory_monitor"
1414
require_relative "supervisor/process_monitor"
15+
require_relative "supervisor/utilization_monitor"
1516

1617
require_relative "supervisor/environment"
1718
require_relative "supervisor/supervised"

lib/async/service/supervisor/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "async/service/managed/environment"
88

99
require_relative "service"
10+
require_relative "server"
1011

1112
module Async
1213
module Service

lib/async/service/supervisor/memory_monitor.rb

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,26 @@ def remove(supervisor_controller)
8585
end
8686
end
8787

88+
# The key used when this monitor's status is aggregated with others.
89+
def self.monitor_type
90+
:memory_monitor
91+
end
92+
93+
# Serialize memory cluster data for JSON.
94+
def as_json
95+
@cluster.as_json
96+
end
97+
98+
# Serialize to JSON string.
99+
def to_json(...)
100+
as_json.to_json(...)
101+
end
102+
88103
# Get status for the memory monitor.
89104
#
90-
# @returns [Hash] Status including the memory cluster.
105+
# @returns [Hash] Hash with type and data keys.
91106
def status
92-
{memory_monitor: @cluster.as_json}
107+
{type: self.class.monitor_type, data: as_json}
93108
end
94109

95110
# Invoked when a memory leak is detected.

lib/async/service/supervisor/process_monitor.rb

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,26 @@ def metrics
5757
Process::Metrics::General.capture(ppid: @ppid).transform_values!(&:as_json)
5858
end
5959

60+
# The key used when this monitor's status is aggregated with others.
61+
def self.monitor_type
62+
:process_monitor
63+
end
64+
65+
# Serialize process metrics for JSON.
66+
def as_json
67+
{ppid: @ppid, metrics: self.metrics}
68+
end
69+
70+
# Serialize to JSON string.
71+
def to_json(...)
72+
as_json.to_json(...)
73+
end
74+
6075
# Get status for the process monitor.
6176
#
62-
# @returns [Hash] Status including process metrics.
77+
# @returns [Hash] Hash with type and data keys.
6378
def status
64-
{process_monitor: {ppid: @ppid, metrics: self.metrics}}
79+
{type: self.class.monitor_type, data: as_json}
6580
end
6681

6782
# Run the process monitor.

lib/async/service/supervisor/supervised.rb

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "async/service/environment"
7+
require "async/utilization"
8+
9+
require_relative "worker"
710

811
module Async
912
module Service
@@ -30,10 +33,38 @@ def supervisor_worker_state
3033
{name: self.name}
3134
end
3235

36+
# A default schema for utilization metrics.
37+
# @returns [Hash | Nil] The utilization schema or nil if utilization is disabled.
38+
def utilization_schema
39+
{
40+
connections_active: :u32,
41+
connections_total: :u64,
42+
requests_active: :u32,
43+
requests_total: :u64,
44+
}
45+
end
46+
47+
# Get the utilization registry for this service.
48+
#
49+
# Creates a new registry instance for tracking utilization metrics.
50+
# This registry is used by workers to emit metrics that can be collected
51+
# by the supervisor's utilization monitor.
52+
#
53+
# @returns [Async::Utilization::Registry] A new utilization registry instance.
54+
def utilization_registry
55+
Async::Utilization::Registry.new
56+
end
57+
3358
# The supervised worker for the current process.
3459
# @returns [Worker] The worker client.
3560
def supervisor_worker
36-
Worker.new(process_id: Process.pid, endpoint: supervisor_endpoint, state: self.supervisor_worker_state)
61+
Worker.new(
62+
process_id: Process.pid,
63+
endpoint: supervisor_endpoint,
64+
state: self.supervisor_worker_state,
65+
utilization_schema: self.utilization_schema,
66+
utilization_registry: self.utilization_registry,
67+
)
3768
end
3869

3970
# Create a supervised worker for the given instance.

0 commit comments

Comments
 (0)