Skip to content

Commit 43005e1

Browse files
committed
WIP Runtime verification of flush level
1 parent b6ea4b4 commit 43005e1

15 files changed

Lines changed: 205 additions & 18 deletions

include/openPMD/IO/AbstractIOHandler.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "openPMD/IterationEncoding.hpp"
2727
#include "openPMD/config.hpp"
2828
#include "openPMD/version.hpp"
29+
#include <ostream>
2930

3031
#if openPMD_HAVE_MPI
3132
#include <mpi.h>
@@ -81,6 +82,8 @@ enum class FlushLevel
8182
CreateOrOpenFiles
8283
};
8384

85+
std::ostream &operator<<(std::ostream &, FlushLevel);
86+
8487
namespace flush_level
8588
{
8689
inline constexpr auto global_flushpoint(FlushLevel fl)

include/openPMD/IO/AbstractIOHandlerImpl.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class AbstractIOHandlerImpl
3939

4040
virtual ~AbstractIOHandlerImpl() = default;
4141

42-
std::future<void> flush();
42+
std::future<void> flush(FlushLevel);
4343

4444
/**
4545
* Close the file corresponding with the writable and release file handles.

include/openPMD/IO/IOTask.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <cstddef>
3636
#include <memory>
3737
#include <optional>
38+
#include <ostream>
3839
#include <string>
3940
#include <utility>
4041
#include <variant>
@@ -89,6 +90,8 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){
8990
}; // note: if you change the enum members here, please update
9091
// docs/source/dev/design.rst
9192

93+
std::ostream &operator<<(std::ostream &os, Operation op);
94+
9295
namespace internal
9396
{
9497
/*

include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl
241241

242242
void touch(Writable *, Parameter<Operation::TOUCH> const &) override;
243243

244-
std::future<void> flush();
244+
std::future<void> flush(internal::ParsedFlushParams &params);
245245

246246
private:
247247
#if openPMD_HAVE_MPI

include/openPMD/RecordComponent.tpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
109109
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
110110
IOHandler()->enqueue(IOTask(this, dCreate));
111111

112-
Parameter<Operation::SET_WRITTEN> sWritten;
113-
sWritten.target_status = true;
114-
IOHandler()->enqueue(IOTask(this, sWritten));
112+
setWritten(true, EnqueueAsynchronously::Yes);
115113
}
116114

117115
Parameter<Operation::GET_BUFFER_VIEW> getBufferView;

include/openPMD/Series.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,7 @@ OPENPMD_private
982982
*
983983
* @param doFlush If true, flush the IO handler.
984984
*/
985-
void flushStep(bool doFlush);
985+
void flushStep(bool doFlush, FlushLevel l);
986986

987987
/*
988988
* setIterationEncoding() should only be called by users of our public API,

src/IO/ADIOS/ADIOS2IOHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val)
582582
std::future<void>
583583
ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams)
584584
{
585-
auto res = AbstractIOHandlerImpl::flush();
585+
auto res = AbstractIOHandlerImpl::flush(flushParams.flushLevel);
586586

587587
detail::ADIOS2File::ADIOS2FlushParams adios2FlushParams{
588588
flushParams.flushLevel, m_flushTarget};

src/IO/AbstractIOHandler.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,29 @@
2727

2828
#include <utility>
2929

30+
namespace openPMD
31+
{
32+
std::ostream &operator<<(std::ostream &os, FlushLevel l)
33+
{
34+
switch (l)
35+
{
36+
case FlushLevel::UserFlush:
37+
os << "UserFlush";
38+
break;
39+
case FlushLevel::InternalFlush:
40+
os << "InternalFlush";
41+
break;
42+
case FlushLevel::SkeletonOnly:
43+
os << "SkeletonOnly";
44+
break;
45+
case FlushLevel::CreateOrOpenFiles:
46+
os << "CreateOrOpenFiles";
47+
break;
48+
}
49+
return os;
50+
}
51+
} // namespace openPMD
52+
3053
namespace openPMD::auxiliary
3154
{
3255
using pair_t = std::pair<OpenpmdStandard, char const *>;

src/IO/AbstractIOHandlerImpl.cpp

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
#include "openPMD/IO/AbstractIOHandlerImpl.hpp"
2323

24+
#include "openPMD/Error.hpp"
25+
#include "openPMD/IO/AbstractIOHandler.hpp"
2426
#include "openPMD/IO/IOTask.hpp"
2527
#include "openPMD/Streaming.hpp"
2628
#include "openPMD/auxiliary/Environment.hpp"
@@ -87,13 +89,74 @@ void AbstractIOHandlerImpl::writeToStderr([[maybe_unused]] Args &&...args) const
8789
}
8890
}
8991

90-
std::future<void> AbstractIOHandlerImpl::flush()
92+
namespace
93+
{
94+
void verifyFlushType(Operation op, FlushLevel l)
95+
{
96+
auto do_throw = [&](char const *least_flush_level) {
97+
std::stringstream err;
98+
err << "Operation " << op << " is not allowed below flush level "
99+
<< least_flush_level << ", but flush level was " << l << ".";
100+
throw error::Internal(err.str());
101+
};
102+
switch (op)
103+
{
104+
case Operation::ADVANCE:
105+
case Operation::CREATE_FILE:
106+
case Operation::CHECK_FILE:
107+
case Operation::OPEN_FILE:
108+
case Operation::CLOSE_FILE:
109+
case Operation::DELETE_FILE:
110+
case Operation::DEREGISTER:
111+
case Operation::TOUCH:
112+
case Operation::LIST_ATTS:
113+
case Operation::LIST_PATHS:
114+
case Operation::OPEN_PATH:
115+
case Operation::SET_WRITTEN:
116+
case Operation::CREATE_PATH:
117+
break;
118+
case Operation::CLOSE_PATH:
119+
case Operation::DELETE_PATH:
120+
case Operation::CREATE_DATASET:
121+
case Operation::EXTEND_DATASET:
122+
case Operation::OPEN_DATASET:
123+
case Operation::DELETE_DATASET:
124+
case Operation::LIST_DATASETS:
125+
if (!flush_level::flush_hierarchy(l))
126+
{
127+
do_throw("SkeletonOnly (hierarchy operations)");
128+
}
129+
break;
130+
case Operation::GET_BUFFER_VIEW:
131+
case Operation::DELETE_ATT:
132+
case Operation::WRITE_ATT:
133+
case Operation::READ_ATT:
134+
case Operation::READ_ATT_ALLSTEPS:
135+
case Operation::AVAILABLE_CHUNKS:
136+
if (!flush_level::write_attributes(l))
137+
{
138+
do_throw("InternalFlush (metadata operations)");
139+
}
140+
break;
141+
case Operation::WRITE_DATASET:
142+
case Operation::READ_DATASET:
143+
if (!flush_level::write_datasets(l))
144+
{
145+
do_throw("UserFlush (flushpoint operations)");
146+
}
147+
break;
148+
}
149+
}
150+
} // namespace
151+
152+
std::future<void> AbstractIOHandlerImpl::flush(FlushLevel l)
91153
{
92154
using namespace auxiliary;
93155

94156
while (!(*m_handler).m_work.empty())
95157
{
96158
IOTask &i = (*m_handler).m_work.front();
159+
// verifyFlushType(i.operation, l);
97160
try
98161
{
99162
switch (i.operation)
@@ -467,6 +530,7 @@ std::future<void> AbstractIOHandlerImpl::flush()
467530
break;
468531
}
469532
}
533+
verifyFlushType(i.operation, l);
470534
}
471535
catch (...)
472536
{

src/IO/HDF5/HDF5IOHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3379,7 +3379,7 @@ auto HDF5IOHandlerImpl::requireFile(
33793379

33803380
std::future<void> HDF5IOHandlerImpl::flush(internal::ParsedFlushParams &params)
33813381
{
3382-
auto res = AbstractIOHandlerImpl::flush();
3382+
auto res = AbstractIOHandlerImpl::flush(params.flushLevel);
33833383

33843384
if (params.backendConfig.json().contains("hdf5"))
33853385
{

0 commit comments

Comments
 (0)