Skip to content

Commit 4fd0a3b

Browse files
authored
feat: support deterministic load generator values (#26174)
* simplify FieldValue types by making load generator functions should be generic over RngCore and passing the RNG in to methods rather than depending on it being available on every type instance that needs it * expose influxdb3_load_generator as library crate * export config, spec, and measurement types publicly to suppore use in the antithesis-e2e crate * fix bug that surfaced whenever the cardinality value was less than the lines per sample value by forcing LP lines in a set of samples to be distinct from one another with nanosecond increments
1 parent 7c02593 commit 4fd0a3b

7 files changed

Lines changed: 357 additions & 242 deletions

File tree

influxdb3_load_generator/src/commands/full.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::commands::{query::run_query_load, write::run_write_load};
88
use super::{common::InfluxDb3Config, query::QueryConfig, write::WriteConfig};
99

1010
#[derive(Debug, Parser)]
11-
pub(crate) struct Config {
11+
pub struct Config {
1212
/// Common InfluxDB 3 Core config
1313
#[clap(flatten)]
1414
common: InfluxDb3Config,
@@ -22,7 +22,7 @@ pub(crate) struct Config {
2222
write: WriteConfig,
2323
}
2424

25-
pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
25+
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
2626
let (client, mut load_config) = config
2727
.common
2828
.initialize_full(

influxdb3_load_generator/src/commands/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::common::InfluxDb3Config;
1717

1818
#[derive(Debug, Parser)]
1919
#[clap(visible_alias = "q", trailing_var_arg = true)]
20-
pub(crate) struct Config {
20+
pub struct Config {
2121
/// Common InfluxDB 3 Core config
2222
#[clap(flatten)]
2323
common: InfluxDb3Config,
@@ -55,7 +55,7 @@ pub(crate) struct QueryConfig {
5555
query_response_format: Format,
5656
}
5757

58-
pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
58+
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
5959
let (client, mut load_config) = config
6060
.common
6161
.initialize_query(config.query.querier_spec_path.take())

influxdb3_load_generator/src/commands/write.rs

Lines changed: 32 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1-
use crate::line_protocol_generator::{Generator, create_generators};
1+
use crate::line_protocol_generator::{GeneratorRunner, create_generators};
22
use crate::report::WriteReporter;
33
use crate::specification::DataSpec;
44
use anyhow::Context;
5-
use chrono::{DateTime, Local};
5+
use chrono::{DateTime, Local, TimeZone};
66
use clap::Parser;
77
use futures::StreamExt;
88
use futures::stream::FuturesUnordered;
9-
use influxdb3_client::{Client, Precision};
10-
use std::ops::Add;
9+
use rand::SeedableRng;
10+
use rand::rngs::SmallRng;
1111
use std::path::PathBuf;
1212
use std::str::FromStr;
1313
use std::sync::Arc;
1414
use std::time::Duration;
15-
use tokio::time::Instant;
1615

1716
use super::common::InfluxDb3Config;
1817

1918
#[derive(Debug, Parser)]
2019
#[clap(visible_alias = "w", trailing_var_arg = true)]
21-
pub(crate) struct Config {
20+
pub struct Config {
2221
/// Common InfluxDB 3 Core config
2322
#[clap(flatten)]
2423
common: InfluxDb3Config,
@@ -29,7 +28,7 @@ pub(crate) struct Config {
2928
}
3029

3130
#[derive(Debug, Parser)]
32-
pub(crate) struct WriteConfig {
31+
pub struct WriteConfig {
3332
/// Sampling interval for the writers. They will generate data at this interval and
3433
/// sleep for the remainder of the interval. Writers stagger writes by this interval divided
3534
/// by the number of writers.
@@ -39,7 +38,7 @@ pub(crate) struct WriteConfig {
3938
env = "INFLUXDB3_LOAD_SAMPLING_INTERVAL",
4039
default_value = "1s"
4140
)]
42-
sampling_interval: SamplingInterval,
41+
pub sampling_interval: SamplingInterval,
4342

4443
/// Number of simultaneous writers. Each writer will generate data at the specified interval.
4544
#[clap(
@@ -48,30 +47,30 @@ pub(crate) struct WriteConfig {
4847
env = "INFLUXDB3_LOAD_WRITER_COUNT",
4948
default_value = "1"
5049
)]
51-
writer_count: usize,
50+
pub writer_count: usize,
5251

5352
/// The path to the writer spec file to use for this run.
5453
///
5554
/// Alternatively, specify a name of a builtin spec to use. If neither are specified, the
5655
/// generator will output a list of builtin specs along with help and an example for writing
5756
/// your own.
5857
#[clap(long = "writer-spec", env = "INFLUXDB3_LOAD_WRITER_SPEC_PATH")]
59-
pub(crate) writer_spec_path: Option<PathBuf>,
58+
pub writer_spec_path: Option<PathBuf>,
6059

6160
/// Tells the generator to run a single sample for each writer in `writer-count` and output the data to stdout.
6261
#[clap(long = "dry-run", default_value = "false")]
63-
dry_run: bool,
62+
pub dry_run: bool,
6463

6564
/// The date and time at which to start the timestamps of the generated data.
6665
///
6766
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
6867
/// specification like `1 hour` in the past. If not specified, defaults to now.
6968
#[clap(long = "start", action)]
70-
start_time: Option<String>,
69+
pub start_time: Option<String>,
7170
}
7271

7372
#[derive(Debug, Clone, Copy)]
74-
struct SamplingInterval(humantime::Duration);
73+
pub struct SamplingInterval(humantime::Duration);
7574

7675
impl FromStr for SamplingInterval {
7776
type Err = SamplingIntervalError;
@@ -99,14 +98,14 @@ impl From<SamplingInterval> for Duration {
9998
}
10099

101100
#[derive(Debug, thiserror::Error)]
102-
enum SamplingIntervalError {
101+
pub enum SamplingIntervalError {
103102
#[error("sampling interval must be greater than 0")]
104103
ZeroDuration,
105104
#[error(transparent)]
106105
Inner(#[from] humantime::DurationError),
107106
}
108107

109-
pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
108+
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
110109
let (client, mut load_config) = config
111110
.common
112111
.initialize_write(config.write.writer_spec_path.take())
@@ -163,12 +162,13 @@ pub(crate) async fn run_write_load(
163162
let mut generators =
164163
create_generators(&spec, writer_count).context("failed to create generators")?;
165164

165+
let mut rng = SmallRng::from_entropy();
166166
// if dry run is set, output from each generator its id and then a single sample
167167
if dry_run {
168168
println!("running dry run for each writer\n");
169169
for g in &mut generators {
170170
let t = Local::now();
171-
let dry_run_output = g.dry_run(t.timestamp_millis());
171+
let dry_run_output = g.dry_run(t, &mut rng);
172172
println!("Writer {}:\n{}", g.writer_id, dry_run_output);
173173
}
174174
return Ok(());
@@ -191,15 +191,21 @@ pub(crate) async fn run_write_load(
191191
for generator in generators {
192192
let reporter = Arc::clone(&reporter);
193193
let sampling_interval = sampling_interval.into();
194-
let task = tokio::spawn(run_generator(
194+
let rng = SmallRng::from_entropy();
195+
let mut runner = GeneratorRunner::new(
195196
generator,
196197
client.clone(),
197198
database_name.clone(),
198-
reporter,
199199
sampling_interval,
200-
start_time,
201-
end_time,
202-
));
200+
)
201+
.with_reporter(reporter);
202+
if let Some(start_time) = start_time {
203+
runner = runner.with_start_time(start_time);
204+
}
205+
if let Some(end_time) = end_time {
206+
runner = runner.with_end_time(end_time);
207+
}
208+
let task = tokio::spawn(runner.run(rng));
203209
tasks.push(task);
204210
}
205211

@@ -212,7 +218,7 @@ pub(crate) async fn run_write_load(
212218
Ok(())
213219
}
214220

215-
fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
221+
pub fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
216222
humantime::parse_rfc3339(s)
217223
.map(Into::into)
218224
.unwrap_or_else(|_| {
@@ -223,148 +229,14 @@ fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
223229
})
224230
}
225231

226-
async fn run_generator(
227-
mut generator: Generator,
228-
client: Client,
229-
database_name: String,
230-
reporter: Arc<WriteReporter>,
231-
sampling_interval: Duration,
232-
start_time: Option<DateTime<Local>>,
233-
end_time: Option<DateTime<Local>>,
234-
) {
235-
// if not generator 1, pause for 100ms to let it start the run to create the schema
236-
if generator.writer_id != 1 {
237-
tokio::time::sleep(Duration::from_millis(100)).await;
238-
}
239-
240-
let mut sample_buffer = vec![];
241-
242-
// if the start time is set, load the historical samples as quickly as possible
243-
if let Some(mut start_time) = start_time {
244-
let mut sample_len = write_sample(
245-
&mut generator,
246-
sample_buffer,
247-
&client,
248-
&database_name,
249-
start_time,
250-
&reporter,
251-
true,
252-
)
253-
.await;
254-
255-
loop {
256-
start_time = start_time.add(sampling_interval);
257-
if start_time > Local::now()
258-
|| end_time
259-
.map(|end_time| start_time > end_time)
260-
.unwrap_or(false)
261-
{
262-
println!(
263-
"writer {} finished historical replay at: {:?}",
264-
generator.writer_id, start_time
265-
);
266-
break;
267-
}
268-
269-
sample_buffer = Vec::with_capacity(sample_len);
270-
sample_len = write_sample(
271-
&mut generator,
272-
sample_buffer,
273-
&client,
274-
&database_name,
275-
start_time,
276-
&reporter,
277-
false,
278-
)
279-
.await;
280-
}
281-
}
282-
283-
// write data until end time or forever
284-
let mut interval = tokio::time::interval(sampling_interval);
285-
let mut sample_len = 1024 * 1024 * 1024;
286-
287-
// we only want to print the error the very first time it happens
288-
let mut print_err = false;
289-
290-
loop {
291-
interval.tick().await;
292-
let now = Local::now();
293-
if let Some(end_time) = end_time {
294-
if now > end_time {
295-
println!("writer {} completed at {}", generator.writer_id, end_time);
296-
return;
297-
}
298-
}
299-
300-
sample_buffer = Vec::with_capacity(sample_len);
301-
sample_len = write_sample(
302-
&mut generator,
303-
sample_buffer,
304-
&client,
305-
&database_name,
306-
now,
307-
&reporter,
308-
print_err,
309-
)
310-
.await;
311-
print_err = true;
312-
}
313-
}
314-
315-
async fn write_sample(
316-
generator: &mut Generator,
317-
mut buffer: Vec<u8>,
318-
client: &Client,
319-
database_name: &String,
320-
sample_time: DateTime<Local>,
321-
reporter: &Arc<WriteReporter>,
322-
print_err: bool,
323-
) -> usize {
324-
// generate the sample, and keep track of the length to set the buffer size for the next loop
325-
let summary = generator
326-
.write_sample_to(sample_time.timestamp_millis(), &mut buffer)
327-
.expect("failed to write sample");
328-
let sample_len = buffer.len();
329-
330-
// time and send the write request
331-
let start_request = Instant::now();
332-
let res = client
333-
.api_v3_write_lp(database_name)
334-
.precision(Precision::Millisecond)
335-
.accept_partial(false)
336-
.body(buffer)
337-
.send()
338-
.await;
339-
let response_time = start_request.elapsed().as_millis() as u64;
340-
341-
// log the report
342-
match res {
343-
Ok(_) => {
344-
reporter.report_write(generator.writer_id, summary, response_time, Local::now());
345-
}
346-
Err(e) => {
347-
// if it's the first error, print the details
348-
if print_err {
349-
eprintln!(
350-
"Error on writer {} writing to server: {:?}",
351-
generator.writer_id, e
352-
);
353-
}
354-
reporter.report_failure(generator.writer_id, response_time, Local::now());
355-
}
356-
}
357-
358-
sample_len
359-
}
360-
361232
pub(crate) fn print_help() {
362233
let built_in_specs = crate::specs::built_in_specs();
363234
let example = built_in_specs.first().unwrap();
364235
let mut generators = create_generators(&example.write_spec, 2).unwrap();
365-
let t = 123;
366-
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t);
367-
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t);
236+
let t = Local.timestamp_millis_opt(123).unwrap();
237+
let mut rng = SmallRng::from_entropy();
238+
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t, &mut rng);
239+
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t, &mut rng);
368240

369241
let builtin_help = built_in_specs
370242
.iter()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
pub mod line_protocol_generator;
2+
pub mod query_generator;
3+
pub mod report;
4+
pub mod specification;
5+
pub mod specs;
6+
7+
pub mod commands {
8+
pub mod common;
9+
pub mod full;
10+
pub mod query;
11+
pub mod write;
12+
}

0 commit comments

Comments
 (0)