Skip to content

Commit be465ec

Browse files
committed
openpmd-pipe: Collect sizes, only load particles
1 parent 2cf1dc7 commit be465ec

1 file changed

Lines changed: 24 additions & 11 deletions

File tree

src/binding/python/openpmd_api/pipe/__main__.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
License: LGPLv3+
1010
"""
1111
import argparse
12+
import math
1213
import os # os.path.basename
1314
import re
1415
import sys # sys.stderr.write
@@ -434,15 +435,16 @@ def __copy(self, src, dest, dump_times, current_path="/data/"):
434435
self.inranks = {}
435436
out_iteration = write_iterations[in_iteration.iteration_index]
436437
sys.stdout.flush()
437-
self.__copy(
438+
loadedbytes = self.__copy(
438439
in_iteration, out_iteration, dump_times,
439440
current_path + str(in_iteration.iteration_index) + "/")
440441
for deferred in self.loads:
441442
deferred.source.load_chunk(
442443
deferred.dynamicView.current_buffer(), deferred.offset,
443444
deferred.extent)
444-
dump_times.now("Closing incoming iteration {}".format(
445-
in_iteration.iteration_index))
445+
dump_times.now(
446+
"Closing incoming iteration {} to load {} bytes".format(
447+
in_iteration.iteration_index, loadedbytes))
446448
dump_times.flush()
447449
in_iteration.close()
448450
dump_times.now("Closing outgoing iteration {}".format(
@@ -462,16 +464,18 @@ def __copy(self, src, dest, dump_times, current_path="/data/"):
462464
if src.empty:
463465
# empty record component automatically created by
464466
# dest.reset_dataset()
465-
pass
467+
return 0
466468
elif src.constant:
467469
dest.make_constant(src.get_attribute("value"))
470+
return 0
468471
else:
469472
chunk_table = src.available_chunks()
470473
# todo buffer the strategy
471474
strategy = distribution_strategy(shape)
472475
my_chunks = strategy.assign(chunk_table, self.inranks,
473476
self.outranks,
474477
self.comm.rank, self.comm.size)
478+
accum = 0
475479
for chunk in my_chunks[
476480
self.comm.rank] if self.comm.rank in my_chunks else []:
477481
if debug:
@@ -481,21 +485,30 @@ def __copy(self, src, dest, dump_times, current_path="/data/"):
481485
print("{}\t{}/{}:\t{} -- {}".format(
482486
current_path, self.comm.rank, self.comm.size,
483487
chunk.offset, end))
488+
accum += math.prod(chunk.extent)
484489
span = dest.store_chunk(chunk.offset, chunk.extent)
485490
self.loads.append(
486491
deferred_load(src, span, chunk.offset, chunk.extent))
492+
493+
accum *= dtype.itemsize
494+
print(accum, "Bytes for", current_path)
495+
return accum
496+
487497
elif isinstance(src, io.Iteration):
488-
self.__copy(src.meshes, dest.meshes, dump_times,
489-
current_path + "meshes/")
490-
self.__copy(src.particles, dest.particles, dump_times,
498+
# m = self.__copy(src.meshes, dest.meshes, dump_times,
499+
# current_path + "meshes/")
500+
p = self.__copy(src.particles, dest.particles, dump_times,
491501
current_path + "particles/")
502+
return p
492503
elif is_container:
504+
acc = 0
493505
for key in src:
494-
self.__copy(src[key], dest[key], dump_times,
506+
acc += self.__copy(src[key], dest[key], dump_times,
495507
current_path + key + "/")
496-
if isinstance(src, io.ParticleSpecies):
497-
self.__copy(src.particle_patches, dest.particle_patches,
498-
dump_times)
508+
# if isinstance(src, io.ParticleSpecies):
509+
# self.__copy(src.particle_patches, dest.particle_patches,
510+
# dump_times)
511+
return acc
499512
else:
500513
raise RuntimeError("Unknown openPMD class: " + str(src))
501514

0 commit comments

Comments
 (0)