Skip to content

Commit e176483

Browse files
authored
Merge pull request #626 from influxdata/ww/oss-sync
feat: support deterministic load generator values (#26174)
2 parents 2390ef1 + ade46d4 commit e176483

7 files changed

Lines changed: 357 additions & 242 deletions

File tree

influxdb3_load_generator/src/commands/full.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::commands::{query::run_query_load, write::run_write_load};
88
use super::{common::InfluxDb3Config, query::QueryConfig, write::WriteConfig};
99

1010
#[derive(Debug, Parser)]
11-
pub(crate) struct Config {
11+
pub struct Config {
1212
/// Common InfluxDB 3 Enterprise config
1313
#[clap(flatten)]
1414
common: InfluxDb3Config,
@@ -22,7 +22,7 @@ pub(crate) struct Config {
2222
write: WriteConfig,
2323
}
2424

25-
pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
25+
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
2626
let (client, mut load_config) = config
2727
.common
2828
.initialize_full(

influxdb3_load_generator/src/commands/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::common::InfluxDb3Config;
1717

1818
#[derive(Debug, Parser)]
1919
#[clap(visible_alias = "q", trailing_var_arg = true)]
20-
pub(crate) struct Config {
20+
pub struct Config {
2121
/// Common InfluxDB 3 Enterprise config
2222
#[clap(flatten)]
2323
common: InfluxDb3Config,
@@ -55,7 +55,7 @@ pub(crate) struct QueryConfig {
5555
query_response_format: Format,
5656
}
5757

58-
pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
58+
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
5959
let (client, mut load_config) = config
6060
.common
6161
.initialize_query(config.query.querier_spec_path.take())

influxdb3_load_generator/src/commands/write.rs

Lines changed: 32 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1-
use crate::line_protocol_generator::{Generator, create_generators};
1+
use crate::line_protocol_generator::{GeneratorRunner, create_generators};
22
use crate::report::WriteReporter;
33
use crate::specification::DataSpec;
44
use anyhow::Context;
5-
use chrono::{DateTime, Local};
5+
use chrono::{DateTime, Local, TimeZone};
66
use clap::Parser;
77
use futures::StreamExt;
88
use futures::stream::FuturesUnordered;
9-
use influxdb3_client::{Client, Precision};
10-
use std::ops::Add;
9+
use rand::SeedableRng;
10+
use rand::rngs::SmallRng;
1111
use std::path::PathBuf;
1212
use std::str::FromStr;
1313
use std::sync::Arc;
1414
use std::time::Duration;
15-
use tokio::time::Instant;
1615

1716
use super::common::InfluxDb3Config;
1817

1918
#[derive(Debug, Parser)]
2019
#[clap(visible_alias = "w", trailing_var_arg = true)]
21-
pub(crate) struct Config {
20+
pub struct Config {
2221
/// Common InfluxDB 3 Enterprise config
2322
#[clap(flatten)]
2423
common: InfluxDb3Config,
@@ -29,7 +28,7 @@ pub(crate) struct Config {
2928
}
3029

3130
#[derive(Debug, Parser)]
32-
pub(crate) struct WriteConfig {
31+
pub struct WriteConfig {
3332
/// Sampling interval for the writers. They will generate data at this interval and
3433
/// sleep for the remainder of the interval. Writers stagger writes by this interval divided
3534
/// by the number of writers.
@@ -39,7 +38,7 @@ pub(crate) struct WriteConfig {
3938
env = "INFLUXDB3_LOAD_SAMPLING_INTERVAL",
4039
default_value = "1s"
4140
)]
42-
sampling_interval: SamplingInterval,
41+
pub sampling_interval: SamplingInterval,
4342

4443
/// Number of simultaneous writers. Each writer will generate data at the specified interval.
4544
#[clap(
@@ -48,30 +47,30 @@ pub(crate) struct WriteConfig {
4847
env = "INFLUXDB3_LOAD_WRITER_COUNT",
4948
default_value = "1"
5049
)]
51-
writer_count: usize,
50+
pub writer_count: usize,
5251

5352
/// The path to the writer spec file to use for this run.
5453
///
5554
/// Alternatively, specify a name of a builtin spec to use. If neither are specified, the
5655
/// generator will output a list of builtin specs along with help and an example for writing
5756
/// your own.
5857
#[clap(long = "writer-spec", env = "INFLUXDB3_LOAD_WRITER_SPEC_PATH")]
59-
pub(crate) writer_spec_path: Option<PathBuf>,
58+
pub writer_spec_path: Option<PathBuf>,
6059

6160
/// Tells the generator to run a single sample for each writer in `writer-count` and output the data to stdout.
6261
#[clap(long = "dry-run", default_value = "false")]
63-
dry_run: bool,
62+
pub dry_run: bool,
6463

6564
/// The date and time at which to start the timestamps of the generated data.
6665
///
6766
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
6867
/// specification like `1 hour` in the past. If not specified, defaults to now.
6968
#[clap(long = "start", action)]
70-
start_time: Option<String>,
69+
pub start_time: Option<String>,
7170
}
7271

7372
#[derive(Debug, Clone, Copy)]
74-
struct SamplingInterval(humantime::Duration);
73+
pub struct SamplingInterval(humantime::Duration);
7574

7675
impl FromStr for SamplingInterval {
7776
type Err = SamplingIntervalError;
@@ -99,14 +98,14 @@ impl From<SamplingInterval> for Duration {
9998
}
10099

101100
#[derive(Debug, thiserror::Error)]
102-
enum SamplingIntervalError {
101+
pub enum SamplingIntervalError {
103102
#[error("sampling interval must be greater than 0")]
104103
ZeroDuration,
105104
#[error(transparent)]
106105
Inner(#[from] humantime::DurationError),
107106
}
108107

109-
pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
108+
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
110109
let (client, mut load_config) = config
111110
.common
112111
.initialize_write(config.write.writer_spec_path.take())
@@ -163,12 +162,13 @@ pub(crate) async fn run_write_load(
163162
let mut generators =
164163
create_generators(&spec, writer_count).context("failed to create generators")?;
165164

165+
let mut rng = SmallRng::from_entropy();
166166
// if dry run is set, output from each generator its id and then a single sample
167167
if dry_run {
168168
println!("running dry run for each writer\n");
169169
for g in &mut generators {
170170
let t = Local::now();
171-
let dry_run_output = g.dry_run(t.timestamp_millis());
171+
let dry_run_output = g.dry_run(t, &mut rng);
172172
println!("Writer {}:\n{}", g.writer_id, dry_run_output);
173173
}
174174
return Ok(());
@@ -191,15 +191,21 @@ pub(crate) async fn run_write_load(
191191
for generator in generators {
192192
let reporter = Arc::clone(&reporter);
193193
let sampling_interval = sampling_interval.into();
194-
let task = tokio::spawn(run_generator(
194+
let rng = SmallRng::from_entropy();
195+
let mut runner = GeneratorRunner::new(
195196
generator,
196197
client.clone(),
197198
database_name.clone(),
198-
reporter,
199199
sampling_interval,
200-
start_time,
201-
end_time,
202-
));
200+
)
201+
.with_reporter(reporter);
202+
if let Some(start_time) = start_time {
203+
runner = runner.with_start_time(start_time);
204+
}
205+
if let Some(end_time) = end_time {
206+
runner = runner.with_end_time(end_time);
207+
}
208+
let task = tokio::spawn(runner.run(rng));
203209
tasks.push(task);
204210
}
205211

@@ -212,7 +218,7 @@ pub(crate) async fn run_write_load(
212218
Ok(())
213219
}
214220

215-
fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
221+
pub fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
216222
humantime::parse_rfc3339(s)
217223
.map(Into::into)
218224
.unwrap_or_else(|_| {
@@ -223,148 +229,14 @@ fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
223229
})
224230
}
225231

226-
async fn run_generator(
227-
mut generator: Generator,
228-
client: Client,
229-
database_name: String,
230-
reporter: Arc<WriteReporter>,
231-
sampling_interval: Duration,
232-
start_time: Option<DateTime<Local>>,
233-
end_time: Option<DateTime<Local>>,
234-
) {
235-
// if not generator 1, pause for 100ms to let it start the run to create the schema
236-
if generator.writer_id != 1 {
237-
tokio::time::sleep(Duration::from_millis(100)).await;
238-
}
239-
240-
let mut sample_buffer = vec![];
241-
242-
// if the start time is set, load the historical samples as quickly as possible
243-
if let Some(mut start_time) = start_time {
244-
let mut sample_len = write_sample(
245-
&mut generator,
246-
sample_buffer,
247-
&client,
248-
&database_name,
249-
start_time,
250-
&reporter,
251-
true,
252-
)
253-
.await;
254-
255-
loop {
256-
start_time = start_time.add(sampling_interval);
257-
if start_time > Local::now()
258-
|| end_time
259-
.map(|end_time| start_time > end_time)
260-
.unwrap_or(false)
261-
{
262-
println!(
263-
"writer {} finished historical replay at: {:?}",
264-
generator.writer_id, start_time
265-
);
266-
break;
267-
}
268-
269-
sample_buffer = Vec::with_capacity(sample_len);
270-
sample_len = write_sample(
271-
&mut generator,
272-
sample_buffer,
273-
&client,
274-
&database_name,
275-
start_time,
276-
&reporter,
277-
false,
278-
)
279-
.await;
280-
}
281-
}
282-
283-
// write data until end time or forever
284-
let mut interval = tokio::time::interval(sampling_interval);
285-
let mut sample_len = 1024 * 1024 * 1024;
286-
287-
// we only want to print the error the very first time it happens
288-
let mut print_err = false;
289-
290-
loop {
291-
interval.tick().await;
292-
let now = Local::now();
293-
if let Some(end_time) = end_time {
294-
if now > end_time {
295-
println!("writer {} completed at {}", generator.writer_id, end_time);
296-
return;
297-
}
298-
}
299-
300-
sample_buffer = Vec::with_capacity(sample_len);
301-
sample_len = write_sample(
302-
&mut generator,
303-
sample_buffer,
304-
&client,
305-
&database_name,
306-
now,
307-
&reporter,
308-
print_err,
309-
)
310-
.await;
311-
print_err = true;
312-
}
313-
}
314-
315-
async fn write_sample(
316-
generator: &mut Generator,
317-
mut buffer: Vec<u8>,
318-
client: &Client,
319-
database_name: &String,
320-
sample_time: DateTime<Local>,
321-
reporter: &Arc<WriteReporter>,
322-
print_err: bool,
323-
) -> usize {
324-
// generate the sample, and keep track of the length to set the buffer size for the next loop
325-
let summary = generator
326-
.write_sample_to(sample_time.timestamp_millis(), &mut buffer)
327-
.expect("failed to write sample");
328-
let sample_len = buffer.len();
329-
330-
// time and send the write request
331-
let start_request = Instant::now();
332-
let res = client
333-
.api_v3_write_lp(database_name)
334-
.precision(Precision::Millisecond)
335-
.accept_partial(false)
336-
.body(buffer)
337-
.send()
338-
.await;
339-
let response_time = start_request.elapsed().as_millis() as u64;
340-
341-
// log the report
342-
match res {
343-
Ok(_) => {
344-
reporter.report_write(generator.writer_id, summary, response_time, Local::now());
345-
}
346-
Err(e) => {
347-
// if it's the first error, print the details
348-
if print_err {
349-
eprintln!(
350-
"Error on writer {} writing to server: {:?}",
351-
generator.writer_id, e
352-
);
353-
}
354-
reporter.report_failure(generator.writer_id, response_time, Local::now());
355-
}
356-
}
357-
358-
sample_len
359-
}
360-
361232
pub(crate) fn print_help() {
362233
let built_in_specs = crate::specs::built_in_specs();
363234
let example = built_in_specs.first().unwrap();
364235
let mut generators = create_generators(&example.write_spec, 2).unwrap();
365-
let t = 123;
366-
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t);
367-
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t);
236+
let t = Local.timestamp_millis_opt(123).unwrap();
237+
let mut rng = SmallRng::from_entropy();
238+
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t, &mut rng);
239+
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t, &mut rng);
368240

369241
let builtin_help = built_in_specs
370242
.iter()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
pub mod line_protocol_generator;
2+
pub mod query_generator;
3+
pub mod report;
4+
pub mod specification;
5+
pub mod specs;
6+
7+
pub mod commands {
8+
pub mod common;
9+
pub mod full;
10+
pub mod query;
11+
pub mod write;
12+
}

0 commit comments

Comments
 (0)