Skip to content

Commit 1ac06b6

Browse files
Finish refactoring app entry points
1 parent ede5c37 commit 1ac06b6

5 files changed

Lines changed: 104 additions & 87 deletions

File tree

src/cli/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717
//! [`CliApp`]: Command Line User Interface
1818
19+
use crate::config::AppConfig;
1920
use crate::{args::DftArgs, execution::AppExecution};
2021
use color_eyre::eyre::eyre;
2122
use color_eyre::Result;
@@ -24,6 +25,10 @@ use datafusion::arrow::datatypes::SchemaRef;
2425
use datafusion::arrow::util::pretty::pretty_format_batches;
2526
use datafusion::arrow::{csv, json};
2627
use datafusion::sql::parser::DFParser;
28+
use datafusion_app::config::{merge_configs, AuthConfig, FlightSQLConfig};
29+
use datafusion_app::extensions::DftSessionStateBuilder;
30+
use datafusion_app::flightsql::FlightSQLContext;
31+
use datafusion_app::local::ExecutionContext;
2732
use datafusion_app::local_benchmarks::LocalBenchmarkStats;
2833
use futures::{Stream, StreamExt};
2934
use log::info;
@@ -565,3 +570,38 @@ fn path_to_writer(path: &Path, schema: SchemaRef) -> Result<AnyWriter> {
565570
}
566571
Err(eyre!("Unable to parse extension"))
567572
}
573+
574+
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
575+
let merged_exec_config = merge_configs(config.shared.clone(), config.cli.execution.clone());
576+
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
577+
.with_extensions()
578+
.await?;
579+
580+
// CLI mode: executing commands from files or CLI arguments
581+
let session_state = session_state_builder.build()?;
582+
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
583+
#[allow(unused_mut)]
584+
let mut app_execution = AppExecution::new(execution_ctx);
585+
#[cfg(feature = "flightsql")]
586+
{
587+
if cli.flightsql {
588+
let auth = AuthConfig {
589+
basic_auth: config.flightsql_client.auth.basic_auth,
590+
bearer_token: config.flightsql_client.auth.bearer_token,
591+
};
592+
let flightsql_cfg = FlightSQLConfig::new(
593+
config.flightsql_client.connection_url,
594+
config.flightsql_client.benchmark_iterations,
595+
auth,
596+
);
597+
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
598+
flightsql_ctx
599+
.create_client(cli.flightsql_host.clone())
600+
.await?;
601+
app_execution.with_flightsql_ctx(flightsql_ctx);
602+
}
603+
}
604+
let app = CliApp::new(app_execution, cli.clone());
605+
app.execute_files_or_commands().await?;
606+
Ok(())
607+
}

src/flightsql_server/mod.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717

1818
pub mod services;
1919

20+
use crate::args::DftArgs;
2021
use crate::config::AppConfig;
2122
use crate::execution::AppExecution;
2223
use color_eyre::{eyre::eyre, Result};
24+
use datafusion_app::config::merge_configs;
25+
use datafusion_app::extensions::DftSessionStateBuilder;
26+
use datafusion_app::local::ExecutionContext;
2327
use log::info;
2428
use metrics::{describe_counter, describe_histogram};
2529
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
@@ -196,4 +200,31 @@ impl FlightSqlApp {
196200
}
197201
}
198202

199-
// pub fn run_flightsql_server(state: AppState) {}
203+
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
204+
let merged_exec_config = merge_configs(
205+
config.shared.clone(),
206+
config.flightsql_server.execution.clone(),
207+
);
208+
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
209+
.with_extensions()
210+
.await?;
211+
// FlightSQL Server mode: start a FlightSQL server
212+
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
213+
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
214+
let session_state = session_state_builder.build()?;
215+
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
216+
if cli.run_ddl {
217+
execution_ctx.execute_ddl().await;
218+
}
219+
let app_execution = AppExecution::new(execution_ctx);
220+
let app = FlightSqlApp::try_new(
221+
app_execution,
222+
&config,
223+
&cli.flightsql_host
224+
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
225+
&config.flightsql_server.server_metrics_port,
226+
)
227+
.await?;
228+
app.run_app().await;
229+
Ok(())
230+
}

src/main.rs

Lines changed: 5 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,8 @@
1717

1818
use clap::Parser;
1919
use color_eyre::Result;
20-
use datafusion_app::local::ExecutionContext;
21-
use datafusion_app::{config::merge_configs, extensions::DftSessionStateBuilder};
2220
use datafusion_dft::args::Command;
23-
use datafusion_dft::tui::state::AppState;
24-
use datafusion_dft::{
25-
args::DftArgs, cli::CliApp, config::create_config, execution::AppExecution, telemetry, tui::App,
26-
};
27-
#[cfg(feature = "flightsql")]
28-
use {
29-
datafusion_app::config::{AuthConfig, FlightSQLConfig},
30-
datafusion_app::flightsql::FlightSQLContext,
31-
datafusion_dft::flightsql_server::FlightSqlApp,
32-
log::info,
33-
};
21+
use datafusion_dft::{args::DftArgs, cli, config::create_config, flightsql_server, tui};
3422

3523
fn main() -> Result<()> {
3624
let cli = DftArgs::parse();
@@ -64,82 +52,15 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
6452
env_logger::init();
6553
}
6654
let cfg = create_config(cli.config_path());
55+
6756
#[cfg(feature = "flightsql")]
6857
if let Some(Command::ServeFlightSql { .. }) = cli.command {
69-
let merged_exec_config =
70-
merge_configs(cfg.shared.clone(), cfg.flightsql_server.execution.clone());
71-
let session_state_builder =
72-
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
73-
.with_extensions()
74-
.await?;
75-
// FlightSQL Server mode: start a FlightSQL server
76-
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
77-
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
78-
let session_state = session_state_builder.build()?;
79-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
80-
if cli.run_ddl {
81-
execution_ctx.execute_ddl().await;
82-
}
83-
let app_execution = AppExecution::new(execution_ctx);
84-
let app = FlightSqlApp::try_new(
85-
app_execution,
86-
&cfg,
87-
&cli.flightsql_host
88-
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
89-
&cfg.flightsql_server.server_metrics_port,
90-
)
91-
.await?;
92-
app.run_app().await;
93-
return Ok(());
58+
flightsql_server::try_run(cli.clone(), cfg.clone()).await?;
9459
}
9560
if !cli.files.is_empty() || !cli.commands.is_empty() {
96-
let merged_exec_config = merge_configs(cfg.shared.clone(), cfg.cli.execution.clone());
97-
let session_state_builder =
98-
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
99-
.with_extensions()
100-
.await?;
101-
102-
// CLI mode: executing commands from files or CLI arguments
103-
let session_state = session_state_builder.build()?;
104-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
105-
#[allow(unused_mut)]
106-
let mut app_execution = AppExecution::new(execution_ctx);
107-
#[cfg(feature = "flightsql")]
108-
{
109-
if cli.flightsql {
110-
let auth = AuthConfig {
111-
basic_auth: cfg.flightsql_client.auth.basic_auth,
112-
bearer_token: cfg.flightsql_client.auth.bearer_token,
113-
};
114-
let flightsql_cfg = FlightSQLConfig::new(
115-
cfg.flightsql_client.connection_url,
116-
cfg.flightsql_client.benchmark_iterations,
117-
auth,
118-
);
119-
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
120-
flightsql_ctx
121-
.create_client(cli.flightsql_host.clone())
122-
.await?;
123-
app_execution.with_flightsql_ctx(flightsql_ctx);
124-
}
125-
}
126-
let app = CliApp::new(app_execution, cli.clone());
127-
app.execute_files_or_commands().await?;
61+
cli::try_run(cli, cfg).await?;
12862
} else {
129-
let merged_exec_config = merge_configs(cfg.shared.clone(), cfg.tui.execution.clone());
130-
let session_state_builder =
131-
DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
132-
.with_extensions()
133-
.await?;
134-
let session_state = session_state_builder.build()?;
135-
136-
// TUI mode: running the TUI
137-
telemetry::initialize_logs()?; // use alternate logging for TUI
138-
let state = AppState::new(cfg);
139-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
140-
let app_execution = AppExecution::new(execution_ctx);
141-
let app = App::new(state, cli, app_execution);
142-
app.run_app().await?;
63+
tui::try_run(cli, cfg).await?;
14364
}
14465

14566
Ok(())

src/tui/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub mod ui;
2323
use color_eyre::eyre::eyre;
2424
use color_eyre::Result;
2525
use crossterm::event as ct;
26+
use datafusion_app::config::merge_configs;
27+
use datafusion_app::extensions::DftSessionStateBuilder;
28+
use datafusion_app::local::ExecutionContext;
2629
use futures::FutureExt;
2730
use log::{debug, error, info, trace};
2831
use ratatui::backend::CrosstermBackend;
@@ -31,6 +34,7 @@ use ratatui::crossterm::{
3134
terminal::{EnterAlternateScreen, LeaveAlternateScreen},
3235
};
3336
use ratatui::{prelude::*, style::palette::tailwind, widgets::*};
37+
use state::AppState;
3438
use std::sync::Arc;
3539
use strum::IntoEnumIterator;
3640
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
@@ -40,6 +44,8 @@ use tokio_util::sync::CancellationToken;
4044

4145
use self::execution::{ExecutionError, ExecutionResultsBatch, TuiExecution};
4246
use self::handlers::{app_event_handler, crossterm_event_handler};
47+
use crate::config::AppConfig;
48+
use crate::telemetry;
4349
use crate::{args::DftArgs, execution::AppExecution};
4450
use datafusion_app::sql_utils::clean_sql;
4551

@@ -357,3 +363,20 @@ impl App<'_> {
357363
app.exit()
358364
}
359365
}
366+
367+
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
368+
let merged_exec_config = merge_configs(config.shared.clone(), config.tui.execution.clone());
369+
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
370+
.with_extensions()
371+
.await?;
372+
let session_state = session_state_builder.build()?;
373+
374+
// TUI mode: running the TUI
375+
telemetry::initialize_logs()?; // use alternate logging for TUI
376+
let state = AppState::new(config);
377+
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
378+
let app_execution = AppExecution::new(execution_ctx);
379+
let app = App::new(state, cli, app_execution);
380+
app.run_app().await?;
381+
Ok(())
382+
}

tests/tui_cases/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use datafusion::common::Result;
2626
use datafusion_app::{extensions::DftSessionStateBuilder, local::ExecutionContext};
2727
use datafusion_dft::{
2828
args::DftArgs,
29+
config::create_config,
2930
execution::AppExecution,
30-
tui::{state::initialize, App, AppEvent},
31+
tui::{state::AppState, App, AppEvent},
3132
};
3233
use tempfile::{tempdir, TempDir};
3334

@@ -49,7 +50,8 @@ impl TestApp<'_> {
4950
/// Create a new [`TestApp`] instance configured with a temporary directory
5051
async fn new() -> Self {
5152
let config_path = tempdir().unwrap();
52-
let state = initialize(config_path.path().to_path_buf());
53+
let config = create_config(config_path.path().to_path_buf());
54+
let state = AppState::new(config);
5355
let session_state =
5456
DftSessionStateBuilder::try_new(Some(state.config.tui.execution.clone()))
5557
.unwrap()

0 commit comments

Comments
 (0)