@@ -72,9 +72,11 @@ void DoWriter(adios2::IO io)
7272 engine.Put (var, data_forward.data (), sync);
7373 engine.Put (stepvar, step);
7474 engine.EndStep ();
75+
76+ // Pace the writer so readers have time to connect and create
77+ // realistic demand patterns (queue sometimes empty, sometimes not)
78+ std::this_thread::sleep_for (std::chrono::milliseconds (15 ));
7579 }
76- // Close the file
77- std::this_thread::sleep_for (std::chrono::milliseconds (200 ));
7880 engine.Close ();
7981 int steps = 0 ;
8082 int received_steps = 0 ;
@@ -102,10 +104,22 @@ void DoReader(adios2::IO io, int Rank)
102104 size_t writerStep;
103105 sstReader.Get (floatVar, myFloats);
104106 sstReader.Get (stepVar, writerStep);
105- // std::cout << "Reader " << Rank << " got writerStep " <<
106- // writerStep << std::endl;
107107 sstReader.EndStep ();
108108 steps += 1 ;
109+
110+ // Simulate varying workloads per reader. Each reader "processes"
111+ // for a duration proportional to its rank, so faster readers drain
112+ // more steps while slower ones let the queue build up. After 3
113+ // steps every reader pauses for 500ms, guaranteeing the writer-side
114+ // request queue drains completely and steps buffer on the writer.
115+ if (steps == 3 )
116+ {
117+ std::this_thread::sleep_for (std::chrono::milliseconds (500 ));
118+ }
119+ else
120+ {
121+ std::this_thread::sleep_for (std::chrono::milliseconds (Rank * 20 ));
122+ }
109123 }
110124 sstReader.Close ();
111125 std::cout << " Reader " << Rank << " got " << steps << " steps " << std::endl;
@@ -123,6 +137,10 @@ TEST_F(TestOnDemandMPI, ADIOS2OnDemandMPI)
123137 MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
124138#endif
125139
140+ // Limit to 4 readers (ranks 1-4) so demand patterns are more
141+ // interesting — with too many readers the queue is never empty.
142+ int maxReaders = 4 ;
143+
126144 // Init without MPI. MPI only used to coordinate non-MPI actors
127145 adios2::ADIOS adios;
128146
@@ -132,11 +150,17 @@ TEST_F(TestOnDemandMPI, ADIOS2OnDemandMPI)
132150 {
133151 DoWriter (io);
134152 }
135- else
153+ else if (mpiRank <= maxReaders)
136154 {
137-
138155 DoReader (io, mpiRank);
139156 }
157+ else
158+ {
159+ // Extra ranks just participate in the MPI_Reduce
160+ int steps = 0 ;
161+ int get_count;
162+ MPI_Reduce (&steps, &get_count, 1 , MPI_INT, MPI_SUM, 0 , MPI_COMM_WORLD);
163+ }
140164}
141165
142166int main (int argc, char **argv)
@@ -146,7 +170,7 @@ int main(int argc, char **argv)
146170 int result;
147171 ::testing::InitGoogleTest (&argc, argv);
148172
149- NSteps = 100 ;
173+ NSteps = 50 ;
150174
151175 ParseArgs (argc, argv);
152176
0 commit comments