Skip to content

Commit ffecb80

Browse files
Fix inversion of control.
1 parent 9bc2643 commit ffecb80

File tree

2 files changed

+14
-17
lines changed

2 files changed

+14
-17
lines changed

lib/async/service/supervisor/worker.rb

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,24 @@ def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}
5656

5757
# Setup utilization observer for this worker.
5858
#
59-
# Delegates to WorkerController to set up the shared memory observer.
60-
# This method is called by the supervisor to inform the worker of the shared memory
61-
# file path and allocated offset.
59+
# Maps the shared memory file and configures the utilization registry to write
60+
# metrics to it. Called by the supervisor (via WorkerController) to inform the
61+
# worker of the shared memory file path and allocated offset.
6262
#
6363
# @parameter path [String] Path to the shared memory file that the worker should map.
6464
# @parameter size [Integer] Size of the shared memory region to map.
6565
# @parameter offset [Integer] Offset into the shared memory buffer allocated for this worker.
6666
# @returns [Array] Array of [key, type, offset] tuples describing the utilization schema.
6767
# Returns empty array if no utilization schema is configured.
6868
def setup_utilization_observer(path, size, offset)
69-
controller = WorkerController.new(self)
70-
controller.setup_utilization_observer(path, size, offset)
69+
return [] unless @utilization_schema
70+
71+
schema = Async::Utilization::Schema.build(@utilization_schema)
72+
observer = Async::Utilization::Observer.open(schema, path, size, offset)
73+
@utilization_registry.observer = observer
74+
75+
# Pass the schema back to the supervisor so it can be used to aggregate the metrics:
76+
observer.schema.to_a
7177
end
7278

7379
protected def connected!(connection)

lib/async/service/supervisor/worker_controller.rb

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,24 +104,15 @@ def garbage_profile_stop(path: nil)
104104

105105
# Setup utilization observer for this worker.
106106
#
107-
# Called by the supervisor to inform the worker of the shared memory file path
108-
# and allocated offset. Uses the utilization schema from the worker if available.
107+
# Delegates to the worker to map the shared memory file and configure its
108+
# utilization registry. Called by the supervisor via RPC.
109109
#
110110
# @parameter path [String] Path to the shared memory file that the worker should map.
111111
# @parameter size [Integer] Size of the shared memory region to map.
112112
# @parameter offset [Integer] Offset into the shared memory buffer allocated for this worker.
113113
# @returns [Array] Array of [key, type, offset] tuples describing the utilization schema.
114-
# Returns empty array if no utilization schema is configured.
115114
def setup_utilization_observer(path, size, offset)
116-
utilization_schema = @worker.utilization_schema
117-
return [] unless utilization_schema
118-
119-
schema = Async::Utilization::Schema.build(utilization_schema)
120-
observer = Async::Utilization::Observer.open(schema, path, size, offset)
121-
@worker.utilization_registry.observer = observer
122-
123-
# Pass the schema back to the supervisor so it can be used to aggregate the metrics:
124-
return observer.schema.to_a
115+
@worker.setup_utilization_observer(path, size, offset)
125116
end
126117
end
127118
end

0 commit comments

Comments
 (0)