Skip to content

Commit ed4df10

Browse files
Start adding execution
1 parent cae037b commit ed4df10

15 files changed

Lines changed: 553 additions & 91 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ flightsql = [
7575
]
7676
functions-json = ["datafusion-app/functions-json"]
7777
functions-parquet = ["datafusion-app/functions-parquet"]
78-
http = ["axum"]
78+
http = ["axum", "dep:metrics", "dep:metrics-exporter-prometheus", "dep:tower-http"]
7979
hudi = ["datafusion-app/hudi"]
8080
huggingface = ["datafusion-app/huggingface"]
8181
iceberg = ["datafusion-app/iceberg"]

crates/datafusion-app/src/config.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::path::PathBuf;
2121

2222
#[cfg(feature = "udfs-wasm")]
2323
use datafusion_udfs_wasm::WasmInputDataType;
24-
use log::info;
2524
use serde::Deserialize;
2625
use std::collections::HashMap;
2726

@@ -109,7 +108,6 @@ impl Default for ExecutionConfig {
109108
}
110109

111110
fn default_ddl_path() -> Option<PathBuf> {
112-
info!("Creating default ExecutionConfig");
113111
if let Some(user_dirs) = directories::UserDirs::new() {
114112
let ddl_path = user_dirs
115113
.home_dir()
@@ -241,7 +239,7 @@ pub struct WasmUdfConfig {
241239
}
242240

243241
#[cfg(feature = "flightsql")]
244-
#[derive(Default)]
242+
#[derive(Clone, Default)]
245243
pub struct FlightSQLConfig {
246244
pub connection_url: String,
247245
pub benchmark_iterations: usize,
@@ -259,14 +257,12 @@ impl FlightSQLConfig {
259257
}
260258
}
261259

262-
#[cfg(feature = "flightsql")]
263260
#[derive(Clone, Debug, Default, Deserialize)]
264261
pub struct AuthConfig {
265262
pub basic_auth: Option<BasicAuth>,
266263
pub bearer_token: Option<String>,
267264
}
268265

269-
#[cfg(feature = "flightsql")]
270266
#[derive(Clone, Debug, Default, Deserialize)]
271267
pub struct BasicAuth {
272268
pub username: String,

crates/datafusion-app/src/flightsql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{config::FlightSQLConfig, flightsql_benchmarks::FlightSQLBenchmarkSta
3434

3535
pub type FlightSQLClient = Mutex<Option<FlightSqlServiceClient<Channel>>>;
3636

37-
#[derive(Default)]
37+
#[derive(Clone, Default)]
3838
pub struct FlightSQLContext {
3939
config: FlightSQLConfig,
4040
flightsql_client: FlightSQLClient,

src/args.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ pub struct DftArgs {
8888
#[clap(short = 'n', help = "Set the number of benchmark iterations to run")]
8989
pub benchmark_iterations: Option<usize>,
9090

91-
#[cfg(any(feature = "flightsql", feature = "flightsql"))]
92-
#[clap(long, help = "Set the host and port to be used for FlightSQL")]
93-
pub flightsql_host: Option<String>,
91+
#[cfg(any(feature = "flightsql", feature = "http"))]
92+
#[clap(long, help = "Set the host and port to be used for server")]
93+
pub host: Option<String>,
9494

9595
#[clap(
9696
long,

src/cli/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,9 +601,7 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
601601
auth,
602602
);
603603
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
604-
flightsql_ctx
605-
.create_client(cli.flightsql_host.clone())
606-
.await?;
604+
flightsql_ctx.create_client(cli.host.clone()).await?;
607605
app_execution.with_flightsql_ctx(flightsql_ctx);
608606
}
609607
}

src/config.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use lazy_static::lazy_static;
2525
use log::{debug, error};
2626
use serde::Deserialize;
2727

28-
#[cfg(feature = "flightsql")]
28+
#[cfg(any(feature = "flightsql", feature = "http"))]
2929
use datafusion_app::config::AuthConfig;
3030

3131
lazy_static! {
@@ -125,8 +125,29 @@ impl Default for FlightSQLClientConfig {
125125
}
126126

127127
#[cfg(feature = "http")]
128-
#[derive(Clone, Debug, Default, Deserialize)]
129-
pub struct HttpServerConfig {}
128+
#[derive(Clone, Debug, Deserialize)]
129+
pub struct HttpServerConfig {
130+
#[serde(default = "default_execution_config")]
131+
pub execution: ExecutionConfig,
132+
#[serde(default = "default_connection_url")]
133+
pub connection_url: String,
134+
#[serde(default = "default_server_metrics_port")]
135+
pub server_metrics_port: String,
136+
#[serde(default = "default_auth_config")]
137+
pub auth: AuthConfig,
138+
}
139+
140+
#[cfg(feature = "http")]
141+
impl Default for HttpServerConfig {
142+
fn default() -> Self {
143+
Self {
144+
execution: default_execution_config(),
145+
connection_url: default_connection_url(),
146+
server_metrics_port: default_server_metrics_port(),
147+
auth: default_auth_config(),
148+
}
149+
}
150+
}
130151

131152
#[derive(Clone, Debug, Default, Deserialize)]
132153
pub struct AppConfig {
@@ -196,12 +217,12 @@ fn default_paste() -> bool {
196217
false
197218
}
198219

199-
#[cfg(feature = "flightsql")]
220+
#[cfg(any(feature = "flightsql", feature = "http"))]
200221
pub fn default_connection_url() -> String {
201222
"http://localhost:50051".to_string()
202223
}
203224

204-
#[cfg(feature = "flightsql")]
225+
#[cfg(any(feature = "flightsql", feature = "http"))]
205226
fn default_server_metrics_port() -> String {
206227
"0.0.0.0:9000".to_string()
207228
}
@@ -215,7 +236,7 @@ fn default_editor_config() -> EditorConfig {
215236
EditorConfig::default()
216237
}
217238

218-
#[cfg(feature = "flightsql")]
239+
#[cfg(any(feature = "flightsql", feature = "http"))]
219240
fn default_auth_config() -> AuthConfig {
220241
AuthConfig::default()
221242
}

src/execution.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,6 @@ use datafusion::prelude::*;
2222
use datafusion_app::flightsql::{FlightSQLClient, FlightSQLContext};
2323
use datafusion_app::local::ExecutionContext;
2424

25-
pub enum AppType {
26-
Cli,
27-
Tui,
28-
FlightSQLServer,
29-
}
30-
3125
/// Provides all core execution functionality for execution queries from either a local
3226
/// `SessionContext` or a remote `FlightSQL` service
3327
pub struct AppExecution {

src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ pub mod args;
22
pub mod cli;
33
pub mod config;
44
pub mod execution;
5-
#[cfg(feature = "flightsql")]
6-
pub mod flightsql_server;
7-
#[cfg(feature = "http")]
8-
pub mod http_server;
5+
pub mod server;
96
pub mod telemetry;
107
pub mod test_utils;
118
pub mod tui;

src/main.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use clap::Parser;
1919
use color_eyre::Result;
20-
#[cfg(feature = "flightsql")]
21-
use datafusion_dft::{args::Command, flightsql_server, http_server};
20+
#[cfg(any(feature = "flightsql", feature = "http"))]
21+
use datafusion_dft::{args::Command, server};
2222
use datafusion_dft::{args::DftArgs, cli, config::create_config, tui};
2323

2424
fn main() -> Result<()> {
@@ -42,6 +42,11 @@ fn should_init_env_logger(cli: &DftArgs) -> bool {
4242
if let Some(Command::ServeFlightSql { .. }) = cli.command {
4343
return true;
4444
}
45+
#[cfg(feature = "http")]
46+
if let Some(Command::ServeHttp { .. }) = cli.command {
47+
return true;
48+
}
49+
4550
if !cli.files.is_empty() || !cli.commands.is_empty() {
4651
return true;
4752
}
@@ -56,11 +61,13 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
5661

5762
#[cfg(feature = "flightsql")]
5863
if let Some(Command::ServeFlightSql { .. }) = cli.command {
59-
flightsql_server::try_run(cli.clone(), cfg.clone()).await?;
64+
server::flightsql::try_run(cli.clone(), cfg.clone()).await?;
65+
return Ok(());
6066
}
6167
#[cfg(feature = "http")]
6268
if let Some(Command::ServeHttp { .. }) = cli.command {
63-
http_server::try_run(cli.clone(), cfg.clone()).await?;
69+
server::http::try_run(cli.clone(), cfg.clone()).await?;
70+
return Ok(());
6471
}
6572

6673
if !cli.files.is_empty() || !cli.commands.is_empty() {
Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
pub mod services;
18+
pub mod service;
1919

2020
use crate::args::DftArgs;
2121
use crate::config::AppConfig;
@@ -25,9 +25,7 @@ use datafusion_app::config::merge_configs;
2525
use datafusion_app::extensions::DftSessionStateBuilder;
2626
use datafusion_app::local::ExecutionContext;
2727
use log::info;
28-
use metrics::{describe_counter, describe_histogram};
29-
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
30-
use services::flightsql::FlightSqlServiceImpl;
28+
use service::FlightSqlServiceImpl;
3129
use std::net::SocketAddr;
3230
use std::time::Duration;
3331
use tokio::net::TcpListener;
@@ -37,25 +35,12 @@ use tonic::transport::Server;
3735
#[cfg(feature = "flightsql")]
3836
use tower_http::validate_request::ValidateRequestHeaderLayer;
3937

40-
const DEFAULT_TIMEOUT_SECONDS: u64 = 60;
41-
42-
fn initialize_metrics() {
43-
describe_counter!("requests", "Incoming requests by FlightSQL endpoint");
38+
use super::try_start_metrics_server;
4439

45-
describe_histogram!(
46-
"get_flight_info_latency_ms",
47-
metrics::Unit::Milliseconds,
48-
"Get flight info latency ms"
49-
);
50-
51-
describe_histogram!(
52-
"do_get_fallback_latency_ms",
53-
metrics::Unit::Milliseconds,
54-
"Do get fallback latency ms"
55-
)
56-
}
40+
const DEFAULT_TIMEOUT_SECONDS: u64 = 60;
41+
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
5742

58-
fn create_server_handle(
43+
pub fn create_server_handle(
5944
config: &AppConfig,
6045
flightsql: FlightSqlServiceImpl,
6146
listener: TcpListener,
@@ -139,37 +124,20 @@ impl FlightSqlApp {
139124
addr: &str,
140125
metrics_addr: &str,
141126
) -> Result<Self> {
142-
let flightsql = services::flightsql::FlightSqlServiceImpl::new(app_execution);
143-
// let OS choose a free port
127+
info!("Listening to FlightSQL on {addr}");
128+
let flightsql = service::FlightSqlServiceImpl::new(app_execution);
144129
let listener = TcpListener::bind(addr).await.unwrap();
145-
let addr = listener.local_addr().unwrap();
146130

147131
// prepare the shutdown channel
148132
let (tx, rx) = tokio::sync::oneshot::channel();
149133
let handle = create_server_handle(config, flightsql, listener, rx)?;
150134

151-
{
152-
let builder = PrometheusBuilder::new();
153-
let addr: SocketAddr = metrics_addr.parse()?;
154-
info!("Listening to metrics on {addr}");
155-
builder
156-
.with_http_listener(addr)
157-
.set_buckets_for_metric(
158-
Matcher::Suffix("latency_ms".to_string()),
159-
&[
160-
1.0, 3.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0, 2500.0,
161-
5000.0, 10000.0, 20000.0,
162-
],
163-
)?
164-
.install()
165-
.expect("failed to install metrics recorder/exporter");
166-
167-
initialize_metrics();
168-
}
135+
let metrics_addr: SocketAddr = metrics_addr.parse()?;
136+
try_start_metrics_server(metrics_addr)?;
169137

170138
let app = Self {
171139
shutdown: Some(tx),
172-
addr,
140+
addr: metrics_addr,
173141
handle: Some(handle),
174142
};
175143
Ok(app)
@@ -188,7 +156,7 @@ impl FlightSqlApp {
188156
}
189157
}
190158

191-
pub async fn run_app(self) {
159+
pub async fn run(self) {
192160
if let Some(handle) = self.handle {
193161
handle
194162
.await
@@ -208,10 +176,8 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
208176
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
209177
.with_extensions()
210178
.await?;
211-
// FlightSQL Server mode: start a FlightSQL server
212-
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
213-
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
214179
let session_state = session_state_builder.build()?;
180+
// FlightSQL Server mode: start a FlightSQL server
215181
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
216182
if cli.run_ddl {
217183
execution_ctx.execute_ddl().await;
@@ -220,11 +186,10 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
220186
let app = FlightSqlApp::try_new(
221187
app_execution,
222188
&config,
223-
&cli.flightsql_host
224-
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
189+
&cli.host.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
225190
&config.flightsql_server.server_metrics_port,
226191
)
227192
.await?;
228-
app.run_app().await;
193+
app.run().await;
229194
Ok(())
230195
}

0 commit comments

Comments
 (0)