Skip to content

Commit 4439210

Browse files
committed
Do not flush if the backend does not support Span API
1 parent c128403 commit 4439210

3 files changed

Lines changed: 51 additions & 24 deletions

File tree

include/openPMD/IO/IOTask.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ struct OPENPMDAPI_EXPORT
558558
}
559559

560560
// in parameters
561+
bool queryOnly = false; // query if the backend supports this
561562
Offset offset;
562563
Extent extent;
563564
Datatype dtype = Datatype::UNDEFINED;

include/openPMD/RecordComponent.tpp

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,6 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
8585
{
8686
verifyChunk<T>(o, e);
8787

88-
/*
89-
* The openPMD backend might not yet know about this dataset.
90-
* Flush the openPMD hierarchy to the backend without flushing any actual
91-
* data yet.
92-
*/
93-
seriesFlush_impl</* flush_entire_series = */ false>(
94-
{FlushLevel::SkeletonOnly});
95-
9688
size_t size = 1;
9789
for (auto ext : e)
9890
{
@@ -102,33 +94,61 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
10294
* Flushing the skeleton does not create datasets,
10395
* so we might need to do it now.
10496
*/
105-
if (!written())
97+
auto &rc = get();
98+
if (!rc.m_dataset.has_value())
10699
{
107-
auto &rc = get();
108-
if (!rc.m_dataset.has_value())
109-
{
110-
throw error::WrongAPIUsage(
111-
"[RecordComponent] Must specify dataset type and extent before "
112-
"using storeChunk() (see RecordComponent::resetDataset()).");
113-
}
114-
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
115-
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
116-
IOHandler()->enqueue(IOTask(this, dCreate));
100+
throw error::WrongAPIUsage(
101+
"[RecordComponent] Must specify dataset type and extent before "
102+
"using storeChunk() (see RecordComponent::resetDataset()).");
117103
}
118104

105+
Parameter<Operation::GET_BUFFER_VIEW> query;
106+
query.queryOnly = true;
107+
IOHandler()->enqueue(IOTask(this, query));
108+
IOHandler()->flush(internal::defaultFlushParams);
109+
119110
Parameter<Operation::GET_BUFFER_VIEW> getBufferView;
120111
getBufferView.offset = o;
121112
getBufferView.extent = e;
122113
getBufferView.dtype = getDatatype();
123-
IOHandler()->enqueue(IOTask(this, getBufferView));
124-
IOHandler()->flush(internal::defaultFlushParams);
125-
auto &out = *getBufferView.out;
126-
if (!out.backendManagedBuffer)
114+
115+
if (query.out->backendManagedBuffer)
116+
{
117+
// Need to initialize the dataset for the Span API
118+
// But this is a non-collective call and initializing the dataset is
119+
// collective in HDF5 So we do this only in backends that actually
120+
// support the Span API (i.e. ADIOS2) which do not share this
121+
// restriction
122+
// TODO: Add some form of collective ::commitDefinitions() call to
123+
// RecordComponents to be called by users before the Span API
124+
if (!written())
125+
{
126+
/*
127+
* The openPMD backend might not yet know about this dataset.
128+
* Flush the openPMD hierarchy to the backend without flushing any
129+
* actual data yet.
130+
*/
131+
seriesFlush_impl</* flush_entire_series = */ false>(
132+
{FlushLevel::SkeletonOnly});
133+
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
134+
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
135+
IOHandler()->enqueue(IOTask(this, dCreate));
136+
137+
setWritten(true, EnqueueAsynchronously::Yes);
138+
}
139+
140+
IOHandler()->enqueue(IOTask(this, getBufferView));
141+
IOHandler()->flush(internal::defaultFlushParams);
142+
}
143+
144+
// The backend might still refuse the operation even if backend managed
145+
// buffers are generally supported, so check again
146+
if (!getBufferView.out->backendManagedBuffer)
127147
{
128148
// note that data might have either
129149
// type shared_ptr<T> or shared_ptr<T[]>
130150
auto data = std::forward<F>(createBuffer)(size);
131-
out.ptr = static_cast<void *>(data.get());
151+
getBufferView.out->ptr = static_cast<void *>(data.get());
132152
if (size > 0)
133153
{
134154
storeChunk(std::move(data), std::move(o), std::move(e));

src/IO/ADIOS/ADIOS2IOHandler.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,12 @@ void ADIOS2IOHandlerImpl::getBufferView(
13161316
parameters.out->backendManagedBuffer = false;
13171317
return;
13181318
}
1319+
else if (parameters.queryOnly)
1320+
{
1321+
parameters.out->backendManagedBuffer = true;
1322+
return;
1323+
}
1324+
13191325
setAndGetFilePosition(writable);
13201326
auto file = refreshFileFromParent(writable, /* preferParentFile = */ false);
13211327
detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError);

0 commit comments

Comments
 (0)