Skip to content

Commit 994d0a5

Browse files
Feat/reorg app entry (#287)
1 parent 42fbe78 commit 994d0a5

11 files changed

Lines changed: 147 additions & 157 deletions

File tree

src/cli/mod.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717
//! [`CliApp`]: Command Line User Interface
1818
19+
use crate::config::AppConfig;
1920
use crate::{args::DftArgs, execution::AppExecution};
2021
use color_eyre::eyre::eyre;
2122
use color_eyre::Result;
@@ -24,6 +25,9 @@ use datafusion::arrow::datatypes::SchemaRef;
2425
use datafusion::arrow::util::pretty::pretty_format_batches;
2526
use datafusion::arrow::{csv, json};
2627
use datafusion::sql::parser::DFParser;
28+
use datafusion_app::config::merge_configs;
29+
use datafusion_app::extensions::DftSessionStateBuilder;
30+
use datafusion_app::local::ExecutionContext;
2731
use datafusion_app::local_benchmarks::LocalBenchmarkStats;
2832
use futures::{Stream, StreamExt};
2933
use log::info;
@@ -33,7 +37,14 @@ use std::fs::File;
3337
use std::io::Write;
3438
use std::path::{Path, PathBuf};
3539
#[cfg(feature = "flightsql")]
36-
use {datafusion_app::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::IntoRequest};
40+
use {
41+
datafusion_app::{
42+
config::{AuthConfig, FlightSQLConfig},
43+
flightsql::FlightSQLContext,
44+
flightsql_benchmarks::FlightSQLBenchmarkStats,
45+
},
46+
tonic::IntoRequest,
47+
};
3748

3849
const LOCAL_BENCHMARK_HEADER_ROW: &str =
3950
"query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";
@@ -565,3 +576,38 @@ fn path_to_writer(path: &Path, schema: SchemaRef) -> Result<AnyWriter> {
565576
}
566577
Err(eyre!("Unable to parse extension"))
567578
}
579+
580+
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
581+
let merged_exec_config = merge_configs(config.shared.clone(), config.cli.execution.clone());
582+
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
583+
.with_extensions()
584+
.await?;
585+
586+
// CLI mode: executing commands from files or CLI arguments
587+
let session_state = session_state_builder.build()?;
588+
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
589+
#[allow(unused_mut)]
590+
let mut app_execution = AppExecution::new(execution_ctx);
591+
#[cfg(feature = "flightsql")]
592+
{
593+
if cli.flightsql {
594+
let auth = AuthConfig {
595+
basic_auth: config.flightsql_client.auth.basic_auth,
596+
bearer_token: config.flightsql_client.auth.bearer_token,
597+
};
598+
let flightsql_cfg = FlightSQLConfig::new(
599+
config.flightsql_client.connection_url,
600+
config.flightsql_client.benchmark_iterations,
601+
auth,
602+
);
603+
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
604+
flightsql_ctx
605+
.create_client(cli.flightsql_host.clone())
606+
.await?;
607+
app_execution.with_flightsql_ctx(flightsql_ctx);
608+
}
609+
}
610+
let app = CliApp::new(app_execution, cli.clone());
611+
app.execute_files_or_commands().await?;
612+
Ok(())
613+
}

src/config.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::path::PathBuf;
2222
use datafusion_app::config::ExecutionConfig;
2323
use directories::{ProjectDirs, UserDirs};
2424
use lazy_static::lazy_static;
25+
use log::{debug, error};
2526
use serde::Deserialize;
2627

2728
#[cfg(feature = "flightsql")]
@@ -188,28 +189,6 @@ fn default_paste() -> bool {
188189
false
189190
}
190191

191-
// #[cfg(feature = "flightsql")]
192-
// #[derive(Clone, Debug, Deserialize)]
193-
// pub struct FlightSQLConfig {
194-
// #[serde(default = "default_connection_url")]
195-
// pub connection_url: String,
196-
// #[serde(default = "default_benchmark_iterations")]
197-
// pub benchmark_iterations: usize,
198-
// #[serde(default = "default_server_metrics_port")]
199-
// pub server_metrics_port: String,
200-
// }
201-
//
202-
// #[cfg(feature = "flightsql")]
203-
// impl Default for FlightSQLConfig {
204-
// fn default() -> Self {
205-
// Self {
206-
// connection_url: default_connection_url(),
207-
// benchmark_iterations: default_benchmark_iterations(),
208-
// server_metrics_port: default_server_metrics_port(),
209-
// }
210-
// }
211-
// }
212-
213192
#[cfg(feature = "flightsql")]
214193
pub fn default_connection_url() -> String {
215194
"http://localhost:50051".to_string()
@@ -233,3 +212,29 @@ fn default_editor_config() -> EditorConfig {
233212
fn default_auth_config() -> AuthConfig {
234213
AuthConfig::default()
235214
}
215+
216+
pub fn create_config(config_path: PathBuf) -> AppConfig {
217+
if config_path.exists() {
218+
debug!("Config exists");
219+
let maybe_config_contents = std::fs::read_to_string(config_path);
220+
if let Ok(config_contents) = maybe_config_contents {
221+
let maybe_parsed_config: std::result::Result<AppConfig, toml::de::Error> =
222+
toml::from_str(&config_contents);
223+
match maybe_parsed_config {
224+
Ok(parsed_config) => {
225+
debug!("Parsed config: {:?}", parsed_config);
226+
parsed_config
227+
}
228+
Err(err) => {
229+
error!("Error parsing config: {:?}", err);
230+
AppConfig::default()
231+
}
232+
}
233+
} else {
234+
AppConfig::default()
235+
}
236+
} else {
237+
debug!("No config, using default");
238+
AppConfig::default()
239+
}
240+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717

1818
pub mod services;
1919

20+
use crate::args::DftArgs;
2021
use crate::config::AppConfig;
2122
use crate::execution::AppExecution;
2223
use color_eyre::{eyre::eyre, Result};
24+
use datafusion_app::config::merge_configs;
25+
use datafusion_app::extensions::DftSessionStateBuilder;
26+
use datafusion_app::local::ExecutionContext;
2327
use log::info;
2428
use metrics::{describe_counter, describe_histogram};
2529
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
@@ -195,3 +199,32 @@ impl FlightSqlApp {
195199
}
196200
}
197201
}
202+
203+
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
204+
let merged_exec_config = merge_configs(
205+
config.shared.clone(),
206+
config.flightsql_server.execution.clone(),
207+
);
208+
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
209+
.with_extensions()
210+
.await?;
211+
// FlightSQL Server mode: start a FlightSQL server
212+
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
213+
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
214+
let session_state = session_state_builder.build()?;
215+
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
216+
if cli.run_ddl {
217+
execution_ctx.execute_ddl().await;
218+
}
219+
let app_execution = AppExecution::new(execution_ctx);
220+
let app = FlightSqlApp::try_new(
221+
app_execution,
222+
&config,
223+
&cli.flightsql_host
224+
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
225+
&config.flightsql_server.server_metrics_port,
226+
)
227+
.await?;
228+
app.run_app().await;
229+
Ok(())
230+
}
File renamed without changes.

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub mod cli;
33
pub mod config;
44
pub mod execution;
55
#[cfg(feature = "flightsql")]
6-
pub mod server;
6+
pub mod flightsql_server;
77
pub mod telemetry;
88
pub mod test_utils;
99
pub mod tui;

src/main.rs

Lines changed: 7 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,9 @@
1717

1818
use clap::Parser;
1919
use color_eyre::Result;
20-
use datafusion_app::local::ExecutionContext;
21-
use datafusion_app::{config::merge_configs, extensions::DftSessionStateBuilder};
22-
use datafusion_dft::args::Command;
23-
use datafusion_dft::{
24-
args::DftArgs,
25-
cli::CliApp,
26-
execution::AppExecution,
27-
telemetry,
28-
tui::{state, App},
29-
};
3020
#[cfg(feature = "flightsql")]
31-
use {
32-
datafusion_app::config::{AuthConfig, FlightSQLConfig},
33-
datafusion_app::flightsql::FlightSQLContext,
34-
datafusion_dft::server::FlightSqlApp,
35-
log::info,
36-
};
21+
use datafusion_dft::{args::Command, flightsql_server};
22+
use datafusion_dft::{args::DftArgs, cli, config::create_config, tui};
3723

3824
fn main() -> Result<()> {
3925
let cli = DftArgs::parse();
@@ -66,93 +52,16 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
6652
if should_init_env_logger(&cli) {
6753
env_logger::init();
6854
}
69-
let state = state::initialize(cli.config_path());
55+
let cfg = create_config(cli.config_path());
56+
7057
#[cfg(feature = "flightsql")]
7158
if let Some(Command::ServeFlightSql { .. }) = cli.command {
72-
let merged_exec_config = merge_configs(
73-
state.config.shared.clone(),
74-
state.config.flightsql_server.execution.clone(),
75-
);
76-
let session_state_builder =
77-
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
78-
.with_extensions()
79-
.await?;
80-
// FlightSQL Server mode: start a FlightSQL server
81-
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
82-
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
83-
let session_state = session_state_builder
84-
// .with_app_type(AppType::FlightSQLServer)
85-
.build()?;
86-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
87-
if cli.run_ddl {
88-
execution_ctx.execute_ddl().await;
89-
}
90-
let app_execution = AppExecution::new(execution_ctx);
91-
let app = FlightSqlApp::try_new(
92-
app_execution,
93-
&state.config,
94-
&cli.flightsql_host
95-
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
96-
&state.config.flightsql_server.server_metrics_port,
97-
)
98-
.await?;
99-
app.run_app().await;
100-
return Ok(());
59+
flightsql_server::try_run(cli.clone(), cfg.clone()).await?;
10160
}
10261
if !cli.files.is_empty() || !cli.commands.is_empty() {
103-
let merged_exec_config = merge_configs(
104-
state.config.shared.clone(),
105-
state.config.cli.execution.clone(),
106-
);
107-
let session_state_builder =
108-
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
109-
.with_extensions()
110-
.await?;
111-
112-
// CLI mode: executing commands from files or CLI arguments
113-
let session_state = session_state_builder.build()?;
114-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
115-
#[allow(unused_mut)]
116-
let mut app_execution = AppExecution::new(execution_ctx);
117-
#[cfg(feature = "flightsql")]
118-
{
119-
if cli.flightsql {
120-
let auth = AuthConfig {
121-
basic_auth: state.config.flightsql_client.auth.basic_auth,
122-
bearer_token: state.config.flightsql_client.auth.bearer_token,
123-
};
124-
let flightsql_cfg = FlightSQLConfig::new(
125-
state.config.flightsql_client.connection_url,
126-
state.config.flightsql_client.benchmark_iterations,
127-
auth,
128-
);
129-
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
130-
flightsql_ctx
131-
.create_client(cli.flightsql_host.clone())
132-
.await?;
133-
app_execution.with_flightsql_ctx(flightsql_ctx);
134-
}
135-
}
136-
let app = CliApp::new(app_execution, cli.clone());
137-
app.execute_files_or_commands().await?;
62+
cli::try_run(cli, cfg).await?;
13863
} else {
139-
let merged_exec_config = merge_configs(
140-
state.config.shared.clone(),
141-
state.config.tui.execution.clone(),
142-
);
143-
let session_state_builder =
144-
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
145-
.with_extensions()
146-
.await?;
147-
let session_state = session_state_builder.build()?;
148-
149-
// TUI mode: running the TUI
150-
telemetry::initialize_logs()?; // use alternate logging for TUI
151-
let state = state::initialize(cli.config_path());
152-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
153-
let app_execution = AppExecution::new(execution_ctx);
154-
let app = App::new(state, cli, app_execution);
155-
app.run_app().await?;
64+
tui::try_run(cli, cfg).await?;
15665
}
15766

15867
Ok(())

src/tui/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub mod ui;
2323
use color_eyre::eyre::eyre;
2424
use color_eyre::Result;
2525
use crossterm::event as ct;
26+
use datafusion_app::config::merge_configs;
27+
use datafusion_app::extensions::DftSessionStateBuilder;
28+
use datafusion_app::local::ExecutionContext;
2629
use futures::FutureExt;
2730
use log::{debug, error, info, trace};
2831
use ratatui::backend::CrosstermBackend;
@@ -31,6 +34,7 @@ use ratatui::crossterm::{
3134
terminal::{EnterAlternateScreen, LeaveAlternateScreen},
3235
};
3336
use ratatui::{prelude::*, style::palette::tailwind, widgets::*};
37+
use state::AppState;
3438
use std::sync::Arc;
3539
use strum::IntoEnumIterator;
3640
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
@@ -40,6 +44,8 @@ use tokio_util::sync::CancellationToken;
4044

4145
use self::execution::{ExecutionError, ExecutionResultsBatch, TuiExecution};
4246
use self::handlers::{app_event_handler, crossterm_event_handler};
47+
use crate::config::AppConfig;
48+
use crate::telemetry;
4349
use crate::{args::DftArgs, execution::AppExecution};
4450
use datafusion_app::sql_utils::clean_sql;
4551

@@ -357,3 +363,20 @@ impl App<'_> {
357363
app.exit()
358364
}
359365
}
366+
367+
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
368+
let merged_exec_config = merge_configs(config.shared.clone(), config.tui.execution.clone());
369+
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
370+
.with_extensions()
371+
.await?;
372+
let session_state = session_state_builder.build()?;
373+
374+
// TUI mode: running the TUI
375+
telemetry::initialize_logs()?; // use alternate logging for TUI
376+
let state = AppState::new(config);
377+
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
378+
let app_execution = AppExecution::new(execution_ctx);
379+
let app = App::new(state, cli, app_execution);
380+
app.run_app().await?;
381+
Ok(())
382+
}

0 commit comments

Comments
 (0)