Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
//! [`CliApp`]: Command Line User Interface

use crate::config::AppConfig;
use crate::{args::DftArgs, execution::AppExecution};
use color_eyre::eyre::eyre;
use color_eyre::Result;
Expand All @@ -24,6 +25,9 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::arrow::{csv, json};
use datafusion::sql::parser::DFParser;
use datafusion_app::config::merge_configs;
use datafusion_app::extensions::DftSessionStateBuilder;
use datafusion_app::local::ExecutionContext;
use datafusion_app::local_benchmarks::LocalBenchmarkStats;
use futures::{Stream, StreamExt};
use log::info;
Expand All @@ -33,7 +37,14 @@ use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
#[cfg(feature = "flightsql")]
use {datafusion_app::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::IntoRequest};
use {
datafusion_app::{
config::{AuthConfig, FlightSQLConfig},
flightsql::FlightSQLContext,
flightsql_benchmarks::FlightSQLBenchmarkStats,
},
tonic::IntoRequest,
};

const LOCAL_BENCHMARK_HEADER_ROW: &str =
"query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";
Expand Down Expand Up @@ -565,3 +576,38 @@ fn path_to_writer(path: &Path, schema: SchemaRef) -> Result<AnyWriter> {
}
Err(eyre!("Unable to parse extension"))
}

pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
let merged_exec_config = merge_configs(config.shared.clone(), config.cli.execution.clone());
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;

// CLI mode: executing commands from files or CLI arguments
let session_state = session_state_builder.build()?;
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
#[allow(unused_mut)]
let mut app_execution = AppExecution::new(execution_ctx);
#[cfg(feature = "flightsql")]
{
if cli.flightsql {
let auth = AuthConfig {
basic_auth: config.flightsql_client.auth.basic_auth,
bearer_token: config.flightsql_client.auth.bearer_token,
};
let flightsql_cfg = FlightSQLConfig::new(
config.flightsql_client.connection_url,
config.flightsql_client.benchmark_iterations,
auth,
);
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
flightsql_ctx
.create_client(cli.flightsql_host.clone())
.await?;
app_execution.with_flightsql_ctx(flightsql_ctx);
}
}
let app = CliApp::new(app_execution, cli.clone());
app.execute_files_or_commands().await?;
Ok(())
}
49 changes: 27 additions & 22 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::path::PathBuf;
use datafusion_app::config::ExecutionConfig;
use directories::{ProjectDirs, UserDirs};
use lazy_static::lazy_static;
use log::{debug, error};
use serde::Deserialize;

#[cfg(feature = "flightsql")]
Expand Down Expand Up @@ -188,28 +189,6 @@ fn default_paste() -> bool {
false
}

// #[cfg(feature = "flightsql")]
// #[derive(Clone, Debug, Deserialize)]
// pub struct FlightSQLConfig {
// #[serde(default = "default_connection_url")]
// pub connection_url: String,
// #[serde(default = "default_benchmark_iterations")]
// pub benchmark_iterations: usize,
// #[serde(default = "default_server_metrics_port")]
// pub server_metrics_port: String,
// }
//
// #[cfg(feature = "flightsql")]
// impl Default for FlightSQLConfig {
// fn default() -> Self {
// Self {
// connection_url: default_connection_url(),
// benchmark_iterations: default_benchmark_iterations(),
// server_metrics_port: default_server_metrics_port(),
// }
// }
// }

#[cfg(feature = "flightsql")]
pub fn default_connection_url() -> String {
"http://localhost:50051".to_string()
Expand All @@ -233,3 +212,29 @@ fn default_editor_config() -> EditorConfig {
fn default_auth_config() -> AuthConfig {
AuthConfig::default()
}

pub fn create_config(config_path: PathBuf) -> AppConfig {
if config_path.exists() {
debug!("Config exists");
let maybe_config_contents = std::fs::read_to_string(config_path);
if let Ok(config_contents) = maybe_config_contents {
let maybe_parsed_config: std::result::Result<AppConfig, toml::de::Error> =
toml::from_str(&config_contents);
match maybe_parsed_config {
Ok(parsed_config) => {
debug!("Parsed config: {:?}", parsed_config);
parsed_config
}
Err(err) => {
error!("Error parsing config: {:?}", err);
AppConfig::default()
}
}
} else {
AppConfig::default()
}
} else {
debug!("No config, using default");
AppConfig::default()
}
}
33 changes: 33 additions & 0 deletions src/server/mod.rs → src/flightsql_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

pub mod services;

use crate::args::DftArgs;
use crate::config::AppConfig;
use crate::execution::AppExecution;
use color_eyre::{eyre::eyre, Result};
use datafusion_app::config::merge_configs;
use datafusion_app::extensions::DftSessionStateBuilder;
use datafusion_app::local::ExecutionContext;
use log::info;
use metrics::{describe_counter, describe_histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
Expand Down Expand Up @@ -195,3 +199,32 @@ impl FlightSqlApp {
}
}
}

pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
let merged_exec_config = merge_configs(
config.shared.clone(),
config.flightsql_server.execution.clone(),
);
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;
// FlightSQL Server mode: start a FlightSQL server
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
let session_state = session_state_builder.build()?;
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
let app_execution = AppExecution::new(execution_ctx);
let app = FlightSqlApp::try_new(
app_execution,
&config,
&cli.flightsql_host
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
&config.flightsql_server.server_metrics_port,
)
.await?;
app.run_app().await;
Ok(())
}
File renamed without changes.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod cli;
pub mod config;
pub mod execution;
#[cfg(feature = "flightsql")]
pub mod server;
pub mod flightsql_server;
pub mod telemetry;
pub mod test_utils;
pub mod tui;
105 changes: 7 additions & 98 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,9 @@

use clap::Parser;
use color_eyre::Result;
use datafusion_app::local::ExecutionContext;
use datafusion_app::{config::merge_configs, extensions::DftSessionStateBuilder};
use datafusion_dft::args::Command;
use datafusion_dft::{
args::DftArgs,
cli::CliApp,
execution::AppExecution,
telemetry,
tui::{state, App},
};
#[cfg(feature = "flightsql")]
use {
datafusion_app::config::{AuthConfig, FlightSQLConfig},
datafusion_app::flightsql::FlightSQLContext,
datafusion_dft::server::FlightSqlApp,
log::info,
};
use datafusion_dft::{args::Command, flightsql_server};
use datafusion_dft::{args::DftArgs, cli, config::create_config, tui};

fn main() -> Result<()> {
let cli = DftArgs::parse();
Expand Down Expand Up @@ -66,93 +52,16 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
if should_init_env_logger(&cli) {
env_logger::init();
}
let state = state::initialize(cli.config_path());
let cfg = create_config(cli.config_path());

#[cfg(feature = "flightsql")]
if let Some(Command::ServeFlightSql { .. }) = cli.command {
let merged_exec_config = merge_configs(
state.config.shared.clone(),
state.config.flightsql_server.execution.clone(),
);
let session_state_builder =
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;
// FlightSQL Server mode: start a FlightSQL server
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
let session_state = session_state_builder
// .with_app_type(AppType::FlightSQLServer)
.build()?;
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
let app_execution = AppExecution::new(execution_ctx);
let app = FlightSqlApp::try_new(
app_execution,
&state.config,
&cli.flightsql_host
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
&state.config.flightsql_server.server_metrics_port,
)
.await?;
app.run_app().await;
return Ok(());
flightsql_server::try_run(cli.clone(), cfg.clone()).await?;
}
if !cli.files.is_empty() || !cli.commands.is_empty() {
let merged_exec_config = merge_configs(
state.config.shared.clone(),
state.config.cli.execution.clone(),
);
let session_state_builder =
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;

// CLI mode: executing commands from files or CLI arguments
let session_state = session_state_builder.build()?;
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
#[allow(unused_mut)]
let mut app_execution = AppExecution::new(execution_ctx);
#[cfg(feature = "flightsql")]
{
if cli.flightsql {
let auth = AuthConfig {
basic_auth: state.config.flightsql_client.auth.basic_auth,
bearer_token: state.config.flightsql_client.auth.bearer_token,
};
let flightsql_cfg = FlightSQLConfig::new(
state.config.flightsql_client.connection_url,
state.config.flightsql_client.benchmark_iterations,
auth,
);
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
flightsql_ctx
.create_client(cli.flightsql_host.clone())
.await?;
app_execution.with_flightsql_ctx(flightsql_ctx);
}
}
let app = CliApp::new(app_execution, cli.clone());
app.execute_files_or_commands().await?;
cli::try_run(cli, cfg).await?;
} else {
let merged_exec_config = merge_configs(
state.config.shared.clone(),
state.config.tui.execution.clone(),
);
let session_state_builder =
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;
let session_state = session_state_builder.build()?;

// TUI mode: running the TUI
telemetry::initialize_logs()?; // use alternate logging for TUI
let state = state::initialize(cli.config_path());
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
let app_execution = AppExecution::new(execution_ctx);
let app = App::new(state, cli, app_execution);
app.run_app().await?;
tui::try_run(cli, cfg).await?;
}

Ok(())
Expand Down
23 changes: 23 additions & 0 deletions src/tui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod ui;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use crossterm::event as ct;
use datafusion_app::config::merge_configs;
use datafusion_app::extensions::DftSessionStateBuilder;
use datafusion_app::local::ExecutionContext;
use futures::FutureExt;
use log::{debug, error, info, trace};
use ratatui::backend::CrosstermBackend;
Expand All @@ -31,6 +34,7 @@ use ratatui::crossterm::{
terminal::{EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{prelude::*, style::palette::tailwind, widgets::*};
use state::AppState;
use std::sync::Arc;
use strum::IntoEnumIterator;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
Expand All @@ -40,6 +44,8 @@ use tokio_util::sync::CancellationToken;

use self::execution::{ExecutionError, ExecutionResultsBatch, TuiExecution};
use self::handlers::{app_event_handler, crossterm_event_handler};
use crate::config::AppConfig;
use crate::telemetry;
use crate::{args::DftArgs, execution::AppExecution};
use datafusion_app::sql_utils::clean_sql;

Expand Down Expand Up @@ -357,3 +363,20 @@ impl App<'_> {
app.exit()
}
}

pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
let merged_exec_config = merge_configs(config.shared.clone(), config.tui.execution.clone());
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;
let session_state = session_state_builder.build()?;

// TUI mode: running the TUI
telemetry::initialize_logs()?; // use alternate logging for TUI
let state = AppState::new(config);
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
let app_execution = AppExecution::new(execution_ctx);
let app = App::new(state, cli, app_execution);
app.run_app().await?;
Ok(())
}
Loading