@@ -128,8 +128,56 @@ def assign(self, assignment, *_):
128128 assignment .assigned [self .rank ].append (element )
129129 return assignment
130130
131+ class IncreaseGranularity (io .PartialStrategy ):
131132
132- # Example how to implement a simple strategy in Python
133+ def __init__ (self ,
134+ granularity ,
135+ inner_distribution = io .ByHostname (io .RoundRobin ())):
136+ super ().__init__ ()
137+ self .inner_distribution = inner_distribution
138+ self .granularity = granularity
139+
140+ def assign (self , assignment , in_ranks , out_ranks ):
141+ if "in_ranks_inner" in dir (self ):
142+ return self .inner_distribution .assign (assignment ,
143+ self .in_ranks_inner ,
144+ self .out_ranks_inner )
145+
146+ hosts_in_order = []
147+ already_seen = set ()
148+ for (_ , hostname ) in in_ranks .items ():
149+ if hostname not in already_seen :
150+ already_seen .add (hostname )
151+ hosts_in_order .append (hostname )
152+ del already_seen
153+ hostname_to_hostgroup = {} # real host -> host group
154+ current_meta_host = 0
155+ granularity_counter = 0
156+ for host in hosts_in_order :
157+ hostname_to_hostgroup [host ] = str (current_meta_host )
158+ granularity_counter += 1
159+ if granularity_counter >= self .granularity :
160+ granularity_counter = 0
161+ current_meta_host += 1
162+ in_ranks_inner = {}
163+ for (rank , hostname ) in in_ranks .items ():
164+ in_ranks_inner [rank ] = hostname_to_hostgroup [hostname ]
165+ out_ranks_inner = {}
166+ for (rank , hostname ) in out_ranks .items ():
167+ try :
168+ out_ranks_inner [rank ] = hostname_to_hostgroup [hostname ]
169+ except KeyError :
170+ out_ranks_inner [rank ] = hostname
171+
172+ self .in_ranks_inner = in_ranks_inner
173+ self .out_ranks_inner = out_ranks_inner
174+
175+ return self .inner_distribution .assign (assignment , in_ranks_inner ,
176+ out_ranks_inner )
177+
178+
179+
180+ #Example how to implement a simple strategy in Python
133181class LoadAll (io .Strategy ):
134182
135183 def __init__ (self , rank ):
@@ -168,7 +216,7 @@ def distribution_strategy(dataset_extent,
168216 strategy_identifier = match .group (2 ))
169217 return io .FromPartialStrategy (io .ByHostname (inside_node ), second_phase )
170218 elif strategy_identifier == 'all' :
171- return io .FromPartialStrategy (LoadOne ( mpi_rank ), LoadAll (mpi_rank ))
219+ return io .FromPartialStrategy (IncreaseGranularity ( 5 ), LoadAll (mpi_rank ))
172220 elif strategy_identifier == 'roundrobin' :
173221 return io .RoundRobin ()
174222 elif strategy_identifier == 'binpacking' :
@@ -319,6 +367,7 @@ def __copy(self, src, dest, current_path="/data/"):
319367 dest .make_constant (src .get_attribute ("value" ))
320368 else :
321369 chunk_table = src .available_chunks ()
370+ # todo buffer the strategy
322371 strategy = distribution_strategy (shape , self .comm .rank ,
323372 self .comm .size )
324373 my_chunks = strategy .assign (chunk_table , self .inranks ,
0 commit comments