@@ -2,6 +2,35 @@ module Graphiti
22 class Scope
33 attr_accessor :object , :unpaginated_object
44 attr_reader :pagination
5+
6+ GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS = %i[
7+ length max_length queue_length max_queue completed_task_count largest_length scheduled_task_count synchronous
8+ ]
9+ GLOBAL_THREAD_POOL_EXECUTOR = Concurrent ::Promises . delay do
10+ if Graphiti . config . concurrency
11+ concurrency = Graphiti . config . concurrency_max_threads || 4
12+ Concurrent ::ThreadPoolExecutor . new (
13+ min_threads : 0 ,
14+ max_threads : concurrency ,
15+ max_queue : concurrency * 4 ,
16+ fallback_policy : :caller_runs
17+ )
18+ else
19+ Concurrent ::ThreadPoolExecutor . new ( max_threads : 0 , synchronous : true , fallback_policy : :caller_runs )
20+ end
21+ end
22+ private_constant :GLOBAL_THREAD_POOL_EXECUTOR , :GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS
23+
24+ def self . global_thread_pool_executor
25+ GLOBAL_THREAD_POOL_EXECUTOR . value!
26+ end
27+
28+ def self . global_thread_pool_stats
29+ GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS . each_with_object ( { } ) do |key , memo |
30+ memo [ key ] = global_thread_pool_executor . send ( key )
31+ end
32+ end
33+
534 def initialize ( object , resource , query , opts = { } )
635 @object = object
736 @resource = resource
@@ -14,57 +43,33 @@ def initialize(object, resource, query, opts = {})
1443 end
1544
1645 def resolve
17- if @query . zero_results?
18- [ ]
19- else
20- resolved = broadcast_data { |payload |
21- @object = @resource . before_resolve ( @object , @query )
22- payload [ :results ] = @resource . resolve ( @object )
23- payload [ :results ]
24- }
25- resolved . compact!
26- assign_serializer ( resolved )
27- yield resolved if block_given?
28- @opts [ :after_resolve ] &.call ( resolved )
29- resolve_sideloads ( resolved ) unless @query . sideloads . empty?
30- resolved
31- end
46+ future_resolve . value!
3247 end
3348
3449 def resolve_sideloads ( results )
35- return if results == [ ]
50+ future_resolve_sideloads ( results ) . value!
51+ end
3652
37- concurrent = Graphiti . config . concurrency
38- promises = [ ]
53+ def future_resolve
54+ return Concurrent :: Promises . fulfilled_future ( [ ] , self . class . global_thread_pool_executor ) if @query . zero_results?
3955
40- @query . sideloads . each_pair do |name , q |
41- sideload = @resource . class . sideload ( name )
42- next if sideload . nil? || sideload . shared_remote?
43- parent_resource = @resource
44- graphiti_context = Graphiti . context
45- resolve_sideload = -> {
46- Graphiti . config . before_sideload &.call ( graphiti_context )
47- Graphiti . context = graphiti_context
48- sideload . resolve ( results , q , parent_resource )
49- @resource . adapter . close if concurrent
50- }
51- if concurrent
52- promises << Concurrent ::Promise . execute ( &resolve_sideload )
53- else
54- resolve_sideload . call
55- end
56+ resolved = broadcast_data { |payload |
57+ @object = @resource . before_resolve ( @object , @query )
58+ payload [ :results ] = @resource . resolve ( @object )
59+ payload [ :results ]
60+ }
61+ resolved . compact!
62+ assign_serializer ( resolved )
63+ yield resolved if block_given?
64+ @opts [ :after_resolve ] &.call ( resolved )
65+ sideloaded = @query . parents . any?
66+ close_adapter = Graphiti . config . concurrency && sideloaded
67+ if close_adapter
68+ @resource . adapter . close
5669 end
5770
58- if concurrent
59- # Wait for all promises to finish
60- sleep 0.01 until promises . all? { |p | p . fulfilled? || p . rejected? }
61- # Re-raise the error with correct stacktrace
62- # OPTION** to avoid failing here?? if so need serializable patch
63- # to avoid loading data when association not loaded
64- if ( rejected = promises . find ( &:rejected? ) )
65- raise rejected . reason
66- end
67- end
71+ future_resolve_sideloads ( resolved )
72+ . then_on ( self . class . global_thread_pool_executor , resolved ) { resolved }
6873 end
6974
7075 def parent_resource
@@ -108,6 +113,74 @@ def updated_at
108113
109114 private
110115
116+ def future_resolve_sideloads ( results )
117+ return Concurrent ::Promises . fulfilled_future ( nil , self . class . global_thread_pool_executor ) if results == [ ]
118+
119+ sideload_promises = @query . sideloads . filter_map do |name , q |
120+ sideload = @resource . class . sideload ( name )
121+ next if sideload . nil? || sideload . shared_remote?
122+
123+ p = future_with_context ( results , q , @resource ) do |parent_results , sideload_query , parent_resource |
124+ Graphiti . config . before_sideload &.call ( Graphiti . context )
125+ sideload . future_resolve ( parent_results , sideload_query , parent_resource )
126+ end
127+ p . flat
128+ end
129+
130+ Concurrent ::Promises . zip_futures_on ( self . class . global_thread_pool_executor , *sideload_promises )
131+ . rescue_on ( self . class . global_thread_pool_executor ) do |*reasons |
132+ first_error = reasons . find { |r | r . is_a? ( Exception ) }
133+ raise first_error
134+ end
135+ end
136+
137+ def future_with_context ( *args )
138+ thread_storage = Thread . current . keys . each_with_object ( { } ) do |key , memo |
139+ memo [ key ] = Thread . current [ key ]
140+ end
141+ fiber_storage =
142+ if Fiber . current . respond_to? ( :storage )
143+ Fiber . current &.storage &.keys &.each_with_object ( { } ) do |key , memo |
144+ memo [ key ] = Fiber [ key ]
145+ end
146+ end
147+
148+ Concurrent ::Promises . future_on (
149+ self . class . global_thread_pool_executor , Thread . current . object_id , thread_storage , fiber_storage , *args
150+ ) do |thread_id , thread_storage , fiber_storage , *args |
151+ wrap_in_rails_executor do
152+ execution_context_changed = thread_id != Thread . current . object_id
153+ if execution_context_changed
154+ thread_storage &.keys &.each_with_object ( Thread . current ) do |key , thread_current |
155+ thread_current [ key ] = thread_storage [ key ]
156+ end
157+ fiber_storage &.keys &.each_with_object ( Fiber ) do |key , fiber_current |
158+ fiber_current [ key ] = fiber_storage [ key ]
159+ end
160+ end
161+
162+ result = Graphiti . broadcast ( :global_thread_pool_task_run , self . class . global_thread_pool_stats ) do
163+ yield ( *args )
164+ end
165+
166+ if execution_context_changed
167+ thread_storage &.keys &.each { |key | Thread . current [ key ] = nil }
168+ fiber_storage &.keys &.each { |key | Fiber [ key ] = nil }
169+ end
170+
171+ result
172+ end
173+ end
174+ end
175+
176+ def wrap_in_rails_executor ( &block )
177+ if defined? ( ::Rails . application . executor )
178+ ::Rails . application . executor . wrap ( &block )
179+ else
180+ yield
181+ end
182+ end
183+
111184 def sideload_resource_proxies
112185 @sideload_resource_proxies ||= begin
113186 @object = @resource . before_resolve ( @object , @query )
0 commit comments