Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions influxdb3_load_generator/src/commands/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::commands::{query::run_query_load, write::run_write_load};
use super::{common::InfluxDb3Config, query::QueryConfig, write::WriteConfig};

#[derive(Debug, Parser)]
pub(crate) struct Config {
pub struct Config {
/// Common InfluxDB 3 Core config
#[clap(flatten)]
common: InfluxDb3Config,
Expand All @@ -22,7 +22,7 @@ pub(crate) struct Config {
write: WriteConfig,
}

pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
let (client, mut load_config) = config
.common
.initialize_full(
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_load_generator/src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::common::InfluxDb3Config;

#[derive(Debug, Parser)]
#[clap(visible_alias = "q", trailing_var_arg = true)]
pub(crate) struct Config {
pub struct Config {
/// Common InfluxDB 3 Core config
#[clap(flatten)]
common: InfluxDb3Config,
Expand Down Expand Up @@ -55,7 +55,7 @@ pub(crate) struct QueryConfig {
query_response_format: Format,
}

pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
let (client, mut load_config) = config
.common
.initialize_query(config.query.querier_spec_path.take())
Expand Down
192 changes: 32 additions & 160 deletions influxdb3_load_generator/src/commands/write.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use crate::line_protocol_generator::{Generator, create_generators};
use crate::line_protocol_generator::{GeneratorRunner, create_generators};
use crate::report::WriteReporter;
use crate::specification::DataSpec;
use anyhow::Context;
use chrono::{DateTime, Local};
use chrono::{DateTime, Local, TimeZone};
use clap::Parser;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use influxdb3_client::{Client, Precision};
use std::ops::Add;
use rand::SeedableRng;
use rand::rngs::SmallRng;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;

use super::common::InfluxDb3Config;

#[derive(Debug, Parser)]
#[clap(visible_alias = "w", trailing_var_arg = true)]
pub(crate) struct Config {
pub struct Config {
/// Common InfluxDB 3 Core config
#[clap(flatten)]
common: InfluxDb3Config,
Expand All @@ -29,7 +28,7 @@ pub(crate) struct Config {
}

#[derive(Debug, Parser)]
pub(crate) struct WriteConfig {
pub struct WriteConfig {
/// Sampling interval for the writers. They will generate data at this interval and
/// sleep for the remainder of the interval. Writers stagger writes by this interval divided
/// by the number of writers.
Expand All @@ -39,7 +38,7 @@ pub(crate) struct WriteConfig {
env = "INFLUXDB3_LOAD_SAMPLING_INTERVAL",
default_value = "1s"
)]
sampling_interval: SamplingInterval,
pub sampling_interval: SamplingInterval,

/// Number of simultaneous writers. Each writer will generate data at the specified interval.
#[clap(
Expand All @@ -48,30 +47,30 @@ pub(crate) struct WriteConfig {
env = "INFLUXDB3_LOAD_WRITER_COUNT",
default_value = "1"
)]
writer_count: usize,
pub writer_count: usize,

/// The path to the writer spec file to use for this run.
///
/// Alternatively, specify a name of a builtin spec to use. If neither are specified, the
/// generator will output a list of builtin specs along with help and an example for writing
/// your own.
#[clap(long = "writer-spec", env = "INFLUXDB3_LOAD_WRITER_SPEC_PATH")]
pub(crate) writer_spec_path: Option<PathBuf>,
pub writer_spec_path: Option<PathBuf>,

/// Tells the generator to run a single sample for each writer in `writer-count` and output the data to stdout.
#[clap(long = "dry-run", default_value = "false")]
dry_run: bool,
pub dry_run: bool,

/// The date and time at which to start the timestamps of the generated data.
///
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
/// specification like `1 hour` in the past. If not specified, defaults to now.
#[clap(long = "start", action)]
start_time: Option<String>,
pub start_time: Option<String>,
}

#[derive(Debug, Clone, Copy)]
struct SamplingInterval(humantime::Duration);
pub struct SamplingInterval(humantime::Duration);

impl FromStr for SamplingInterval {
type Err = SamplingIntervalError;
Expand Down Expand Up @@ -99,14 +98,14 @@ impl From<SamplingInterval> for Duration {
}

#[derive(Debug, thiserror::Error)]
enum SamplingIntervalError {
pub enum SamplingIntervalError {
#[error("sampling interval must be greater than 0")]
ZeroDuration,
#[error(transparent)]
Inner(#[from] humantime::DurationError),
}

pub(crate) async fn command(mut config: Config) -> Result<(), anyhow::Error> {
pub async fn command(mut config: Config) -> Result<(), anyhow::Error> {
let (client, mut load_config) = config
.common
.initialize_write(config.write.writer_spec_path.take())
Expand Down Expand Up @@ -163,12 +162,13 @@ pub(crate) async fn run_write_load(
let mut generators =
create_generators(&spec, writer_count).context("failed to create generators")?;

let mut rng = SmallRng::from_entropy();
// if dry run is set, output from each generator its id and then a single sample
if dry_run {
println!("running dry run for each writer\n");
for g in &mut generators {
let t = Local::now();
let dry_run_output = g.dry_run(t.timestamp_millis());
let dry_run_output = g.dry_run(t, &mut rng);
println!("Writer {}:\n{}", g.writer_id, dry_run_output);
}
return Ok(());
Expand All @@ -191,15 +191,21 @@ pub(crate) async fn run_write_load(
for generator in generators {
let reporter = Arc::clone(&reporter);
let sampling_interval = sampling_interval.into();
let task = tokio::spawn(run_generator(
let rng = SmallRng::from_entropy();
let mut runner = GeneratorRunner::new(
generator,
client.clone(),
database_name.clone(),
reporter,
sampling_interval,
start_time,
end_time,
));
)
.with_reporter(reporter);
if let Some(start_time) = start_time {
runner = runner.with_start_time(start_time);
}
if let Some(end_time) = end_time {
runner = runner.with_end_time(end_time);
}
let task = tokio::spawn(runner.run(rng));
tasks.push(task);
}

Expand All @@ -212,7 +218,7 @@ pub(crate) async fn run_write_load(
Ok(())
}

fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
pub fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
humantime::parse_rfc3339(s)
.map(Into::into)
.unwrap_or_else(|_| {
Expand All @@ -223,148 +229,14 @@ fn parse_time_offset(s: &str, now: DateTime<Local>) -> DateTime<Local> {
})
}

async fn run_generator(
mut generator: Generator,
client: Client,
database_name: String,
reporter: Arc<WriteReporter>,
sampling_interval: Duration,
start_time: Option<DateTime<Local>>,
end_time: Option<DateTime<Local>>,
) {
// if not generator 1, pause for 100ms to let it start the run to create the schema
if generator.writer_id != 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}

let mut sample_buffer = vec![];

// if the start time is set, load the historical samples as quickly as possible
if let Some(mut start_time) = start_time {
let mut sample_len = write_sample(
&mut generator,
sample_buffer,
&client,
&database_name,
start_time,
&reporter,
true,
)
.await;

loop {
start_time = start_time.add(sampling_interval);
if start_time > Local::now()
|| end_time
.map(|end_time| start_time > end_time)
.unwrap_or(false)
{
println!(
"writer {} finished historical replay at: {:?}",
generator.writer_id, start_time
);
break;
}

sample_buffer = Vec::with_capacity(sample_len);
sample_len = write_sample(
&mut generator,
sample_buffer,
&client,
&database_name,
start_time,
&reporter,
false,
)
.await;
}
}

// write data until end time or forever
let mut interval = tokio::time::interval(sampling_interval);
let mut sample_len = 1024 * 1024 * 1024;

// we only want to print the error the very first time it happens
let mut print_err = false;

loop {
interval.tick().await;
let now = Local::now();
if let Some(end_time) = end_time {
if now > end_time {
println!("writer {} completed at {}", generator.writer_id, end_time);
return;
}
}

sample_buffer = Vec::with_capacity(sample_len);
sample_len = write_sample(
&mut generator,
sample_buffer,
&client,
&database_name,
now,
&reporter,
print_err,
)
.await;
print_err = true;
}
}

async fn write_sample(
generator: &mut Generator,
mut buffer: Vec<u8>,
client: &Client,
database_name: &String,
sample_time: DateTime<Local>,
reporter: &Arc<WriteReporter>,
print_err: bool,
) -> usize {
// generate the sample, and keep track of the length to set the buffer size for the next loop
let summary = generator
.write_sample_to(sample_time.timestamp_millis(), &mut buffer)
.expect("failed to write sample");
let sample_len = buffer.len();

// time and send the write request
let start_request = Instant::now();
let res = client
.api_v3_write_lp(database_name)
.precision(Precision::Millisecond)
.accept_partial(false)
.body(buffer)
.send()
.await;
let response_time = start_request.elapsed().as_millis() as u64;

// log the report
match res {
Ok(_) => {
reporter.report_write(generator.writer_id, summary, response_time, Local::now());
}
Err(e) => {
// if it's the first error, print the details
if print_err {
eprintln!(
"Error on writer {} writing to server: {:?}",
generator.writer_id, e
);
}
reporter.report_failure(generator.writer_id, response_time, Local::now());
}
}

sample_len
}

pub(crate) fn print_help() {
let built_in_specs = crate::specs::built_in_specs();
let example = built_in_specs.first().unwrap();
let mut generators = create_generators(&example.write_spec, 2).unwrap();
let t = 123;
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t);
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t);
let t = Local.timestamp_millis_opt(123).unwrap();
let mut rng = SmallRng::from_entropy();
let dry_run_output_1 = generators.get_mut(0).unwrap().dry_run(t, &mut rng);
let dry_run_output_2 = generators.get_mut(1).unwrap().dry_run(t, &mut rng);

let builtin_help = built_in_specs
.iter()
Expand Down
12 changes: 12 additions & 0 deletions influxdb3_load_generator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
pub mod line_protocol_generator;
pub mod query_generator;
pub mod report;
pub mod specification;
pub mod specs;

pub mod commands {
pub mod common;
pub mod full;
pub mod query;
pub mod write;
}
Loading