Skip to content

Commit 66d4828

Browse files
Restore worker state interface.
1 parent 601aa40 commit 66d4828

File tree

7 files changed

+160
-5
lines changed

7 files changed

+160
-5
lines changed

guides/migration/readme.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,3 +309,9 @@ The new approach provides:
309309
- Simpler ID management.
310310
- Better controller-based API.
311311
- More explicit worker lifecycle management.
312+
313+
### Connection State Handling
314+
315+
State has moved from `connection.state` to `supervisor_controller.state`. Monitors receive `supervisor_controller` objects instead of `connection` objects, and access state via `supervisor_controller.state[:name]` instead of `connection.state[:name]`.
316+
317+
By default, the `Supervised` module automatically includes the service name in state via `supervisor_worker_state`. Override `supervisor_worker_state` in your service environment to customize the state.

lib/async/service/supervisor/supervised.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,16 @@ def supervisor_endpoint
2424
::IO::Endpoint.unix(supervisor_ipc_path)
2525
end
2626

27+
# The state to associate with the supervised worker.
28+
# @returns [Hash]
29+
def supervisor_worker_state
30+
{name: self.name}
31+
end
32+
2733
# The supervised worker for the current process.
2834
# @returns [Worker] The worker client.
2935
def supervisor_worker
30-
Worker.new(process_id: Process.pid, endpoint: supervisor_endpoint)
36+
Worker.new(process_id: Process.pid, endpoint: supervisor_endpoint, state: self.supervisor_worker_state)
3137
end
3238

3339
# Create a supervised worker for the given instance.

lib/async/service/supervisor/supervisor_controller.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def initialize(server, connection)
1919
@id = nil
2020
@process_id = nil
2121
@worker = nil
22+
@state = {}
2223
end
2324

2425
# @attribute [Server] The server instance.
@@ -36,20 +37,25 @@ def initialize(server, connection)
3637
# @attribute [Proxy] The proxy to the worker controller.
3738
attr :worker
3839

40+
# @attribute [Hash] State associated with this worker connection (e.g., service name).
41+
attr_accessor :state
42+
3943
# Register a worker connection with the supervisor.
4044
#
4145
# Allocates a unique sequential ID, stores the worker controller proxy,
4246
# and notifies all monitors of the new connection.
4347
#
4448
# @parameter worker [Proxy] The proxy to the worker controller.
4549
# @parameter process_id [Integer] The process ID of the worker.
50+
# @parameter state [Hash] Optional state to associate with this worker (e.g., service name).
4651
# @returns [Integer] The connection ID assigned to the worker.
47-
def register(worker, process_id:)
52+
def register(worker, process_id:, state: {})
4853
raise RuntimeError, "Already registered" if @id
4954

5055
@id = @server.next_id
5156
@process_id = process_id
5257
@worker = worker
58+
@state.merge!(state)
5359

5460
@server.add(self)
5561

lib/async/service/supervisor/worker.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint)
2525
#
2626
# @parameter process_id [Integer] The process ID to register with the supervisor.
2727
# @parameter endpoint [IO::Endpoint] The supervisor endpoint to connect to.
28-
def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint)
28+
# @parameter state [Hash] Optional state to associate with this worker (e.g., service name).
29+
def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {})
2930
super(endpoint: endpoint)
3031

3132
@id = nil
3233
@process_id = process_id
34+
@state = state
3335
end
3436

3537
# @attribute [Integer] The ID assigned by the supervisor.
@@ -38,6 +40,9 @@ def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint)
3840
# @attribute [Integer] The process ID of the worker.
3941
attr :process_id
4042

43+
# @attribute [Hash] State associated with this worker (e.g., service name).
44+
attr_accessor :state
45+
4146
protected def connected!(connection)
4247
super
4348

@@ -49,10 +54,9 @@ def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint)
4954
# The supervisor allocates a unique ID and returns it
5055
# This is a synchronous RPC call that will complete before returning
5156
supervisor = connection[:supervisor]
52-
@id = supervisor.register(worker_proxy, process_id: @process_id)
57+
@id = supervisor.register(worker_proxy, process_id: @process_id, state: @state)
5358
end
5459
end
5560
end
5661
end
5762
end
58-

releases.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Add `state` attribute to `SupervisorController` to store per-worker metadata (e.g., service name).
6+
- Add `state` parameter to `Worker#initialize` to allow workers to provide state during registration.
7+
- State is now accessible via `supervisor_controller.state` instead of `connection.state` (as it was in `Async::Container::Supervisor`).
8+
39
## v0.10.0
410

511
- Serialize `register`/`remove` and `check!` operations in `MemoryMonitor` to prevent race conditions.

test/async/service/supervised.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ def setup(container)
3333

3434
it "can define a supervised service" do
3535
environment = Async::Service::Environment.build(root: @root) do
36+
name "simple-service"
37+
3638
service_class {SimpleService}
3739

3840
include Async::Service::Supervisor::Supervised
@@ -48,6 +50,9 @@ def setup(container)
4850
supervisor_controller = event.supervisor_controller
4951

5052
expect(supervisor_controller.process_id).to be == ::Process.pid
53+
expect(supervisor_controller.state).to have_keys(
54+
name: be == "simple-service"
55+
)
5156
ensure
5257
worker_task&.stop
5358
end

test/async/service/supervisor.rb

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,67 @@
2323
worker_task&.stop
2424
end
2525

26+
with "state handling" do
27+
it "stores state on supervisor_controller during registration" do
28+
custom_state = {name: "test-service", version: "1.0"}
29+
worker = Async::Service::Supervisor::Worker.new(
30+
process_id: ::Process.pid,
31+
endpoint: endpoint,
32+
state: custom_state
33+
)
34+
worker_task = worker.run
35+
36+
# Wait for registration via the registration monitor
37+
event = registration_monitor.pop(timeout: 5)
38+
39+
expect(event).to be_truthy
40+
supervisor_controller = event.supervisor_controller
41+
42+
expect(supervisor_controller.state).to have_keys(
43+
name: be == "test-service",
44+
version: be == "1.0"
45+
)
46+
ensure
47+
worker_task&.stop
48+
end
49+
50+
it "defaults to empty state hash" do
51+
worker = Async::Service::Supervisor::Worker.new(process_id: ::Process.pid, endpoint: endpoint)
52+
worker_task = worker.run
53+
54+
# Wait for registration via the registration monitor
55+
event = registration_monitor.pop(timeout: 5)
56+
57+
expect(event).to be_truthy
58+
supervisor_controller = event.supervisor_controller
59+
60+
expect(supervisor_controller.state).to be == {}
61+
ensure
62+
worker_task&.stop
63+
end
64+
65+
it "allows monitors to access state" do
66+
custom_state = {name: "outbox"}
67+
worker = Async::Service::Supervisor::Worker.new(
68+
process_id: ::Process.pid,
69+
endpoint: endpoint,
70+
state: custom_state
71+
)
72+
worker_task = worker.run
73+
74+
# Wait for registration via the registration monitor
75+
event = registration_monitor.pop(timeout: 5)
76+
77+
expect(event).to be_truthy
78+
supervisor_controller = event.supervisor_controller
79+
80+
# Verify state can be accessed by monitors
81+
expect(supervisor_controller.state[:name]).to be == "outbox"
82+
ensure
83+
worker_task&.stop
84+
end
85+
end
86+
2687
with "memory_dump" do
2788
it "can dump memory" do
2889
worker = Async::Service::Supervisor::Worker.new(process_id: ::Process.pid, endpoint: endpoint)
@@ -113,5 +174,66 @@
113174
end
114175
end
115176

177+
with "state handling" do
178+
it "stores state on supervisor_controller during registration" do
179+
custom_state = {name: "test-service", version: "1.0"}
180+
worker = Async::Service::Supervisor::Worker.new(
181+
process_id: ::Process.pid,
182+
endpoint: endpoint,
183+
state: custom_state
184+
)
185+
worker_task = worker.run
186+
187+
# Wait for registration via the registration monitor
188+
event = registration_monitor.pop(timeout: 5)
189+
190+
expect(event).to be_truthy
191+
supervisor_controller = event.supervisor_controller
192+
193+
expect(supervisor_controller.state).to have_keys(
194+
name: be == "test-service",
195+
version: be == "1.0"
196+
)
197+
ensure
198+
worker_task&.stop
199+
end
200+
201+
it "defaults to empty state hash" do
202+
worker = Async::Service::Supervisor::Worker.new(process_id: ::Process.pid, endpoint: endpoint)
203+
worker_task = worker.run
204+
205+
# Wait for registration via the registration monitor
206+
event = registration_monitor.pop(timeout: 5)
207+
208+
expect(event).to be_truthy
209+
supervisor_controller = event.supervisor_controller
210+
211+
expect(supervisor_controller.state).to be == {}
212+
ensure
213+
worker_task&.stop
214+
end
215+
216+
it "allows monitors to access state" do
217+
custom_state = {name: "outbox"}
218+
worker = Async::Service::Supervisor::Worker.new(
219+
process_id: ::Process.pid,
220+
endpoint: endpoint,
221+
state: custom_state
222+
)
223+
worker_task = worker.run
224+
225+
# Wait for registration via the registration monitor
226+
event = registration_monitor.pop(timeout: 5)
227+
228+
expect(event).to be_truthy
229+
supervisor_controller = event.supervisor_controller
230+
231+
# Verify state can be accessed by monitors
232+
expect(supervisor_controller.state[:name]).to be == "outbox"
233+
ensure
234+
worker_task&.stop
235+
end
236+
end
237+
116238
end
117239

0 commit comments

Comments
 (0)