Skip to content

Commit 468503b

Browse files
FlightSQL for HTTP refactoring
1 parent a2a61d6 commit 468503b

6 files changed

Lines changed: 56 additions & 20 deletions

File tree

crates/datafusion-app/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pub struct WasmUdfConfig {
239239
}
240240

241241
#[cfg(feature = "flightsql")]
242-
#[derive(Clone, Default)]
242+
#[derive(Clone, Debug, Default)]
243243
pub struct FlightSQLConfig {
244244
pub connection_url: String,
245245
pub benchmark_iterations: usize,

crates/datafusion-app/src/flightsql.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,25 @@ use crate::{
4242

4343
pub type FlightSQLClient = Arc<Mutex<Option<FlightSqlServiceClient<Channel>>>>;
4444

45-
#[derive(Clone, Default)]
45+
#[derive(Clone, Debug, Default)]
4646
pub struct FlightSQLContext {
4747
config: FlightSQLConfig,
48-
flightsql_client: FlightSQLClient,
48+
client: FlightSQLClient,
4949
}
5050

5151
impl FlightSQLContext {
5252
pub fn new(config: FlightSQLConfig) -> Self {
5353
Self {
5454
config,
55-
flightsql_client: Arc::new(Mutex::new(None)),
55+
client: Arc::new(Mutex::new(None)),
5656
}
5757
}
5858

5959
pub fn client(&self) -> &FlightSQLClient {
60-
&self.flightsql_client
60+
&self.client
6161
}
6262

63+
// TODO - Make this part of `new` method
6364
/// Create FlightSQL client from users FlightSQL config
6465
pub async fn create_client(&self, cli_host: Option<String>) -> Result<()> {
6566
let final_url = cli_host.unwrap_or(self.config.connection_url.clone());
@@ -74,6 +75,7 @@ impl FlightSQLContext {
7475
//
7576
// Although that is for HTTP/1.1 and GRPC uses HTTP/2 - so maybe it has changed.
7677
// To be tested later with the Tower auth layers to see what they support.
78+
// TODO - Do we need this feature block?
7779
#[cfg(feature = "flightsql")]
7880
{
7981
if let Some(token) = &self.config.auth.bearer_token {
@@ -85,7 +87,7 @@ impl FlightSQLContext {
8587
client.set_header("Authorization", format!("Basic {encoded_basic}"))
8688
}
8789
}
88-
let mut guard = self.flightsql_client.lock().await;
90+
let mut guard = self.client.lock().await;
8991
*guard = Some(client);
9092
Ok(())
9193
}
@@ -111,7 +113,7 @@ impl FlightSQLContext {
111113
let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
112114
let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
113115
if statements.len() == 1 {
114-
if let Some(ref mut client) = *self.flightsql_client.lock().await {
116+
if let Some(ref mut client) = *self.client.lock().await {
115117
for _ in 0..iterations {
116118
let mut rows = 0;
117119
let start = std::time::Instant::now();
@@ -171,7 +173,7 @@ impl FlightSQLContext {
171173
sql: &str,
172174
_opts: ExecOptions,
173175
) -> DFResult<ExecResult> {
174-
if let Some(ref mut client) = *self.flightsql_client.lock().await {
176+
if let Some(ref mut client) = *self.client.lock().await {
175177
let flight_info = client.execute(sql.to_string(), None).await?;
176178
if flight_info.endpoint.len() != 1 {
177179
return Err(DataFusionError::External("More than one endpoint".into()));

justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ install-tools:
88

99
# Starts a debug HTTP server
1010
serve-http:
11-
RUST_LOG=info cargo r --features=http -- serve-http
11+
RUST_LOG=info cargo r --features=http,flightsql -- serve-http
1212

1313
# Starts a debug FlightSQL server
1414
serve-flight-sql:

src/execution.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion_app::{local::ExecutionContext, ExecOptions, ExecResult};
2525

2626
/// Provides all core execution functionality for execution queries from either a local
2727
/// `SessionContext` or a remote `FlightSQL` service
28-
#[derive(Clone)]
28+
#[derive(Clone, Debug)]
2929
pub struct AppExecution {
3030
local: ExecutionContext,
3131
#[cfg(feature = "flightsql")]
@@ -65,16 +65,19 @@ impl AppExecution {
6565
}
6666

6767
pub async fn execute_sql_with_opts(&self, sql: &str, opts: ExecOptions) -> Result<ExecResult> {
68-
if cfg!(feature = "flightsql") & opts.flightsql {
69-
self.flightsql
68+
#[cfg(feature = "flightsql")]
69+
if opts.flightsql {
70+
return self
71+
.flightsql
7072
.execute_sql_with_opts(sql, opts)
7173
.await
72-
.map_err(|e| e.into())
73-
} else {
74-
self.local
75-
.execute_sql_with_opts(sql, opts)
76-
.await
77-
.map_err(|e| e.into())
74+
.map_err(|e| e.into());
7875
}
76+
77+
// If flightsql is not enabled or `opts.flightsql` is false, fall back to local:
78+
self.local
79+
.execute_sql_with_opts(sql, opts)
80+
.await
81+
.map_err(|e| e.into())
7982
}
8083
}

src/server/http/mod.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use crate::{args::DftArgs, config::AppConfig, execution::AppExecution};
2323
use axum::Router;
2424
use color_eyre::Result;
2525
use datafusion_app::{
26-
config::merge_configs, extensions::DftSessionStateBuilder, local::ExecutionContext,
26+
config::{merge_configs, AuthConfig, FlightSQLConfig},
27+
extensions::DftSessionStateBuilder,
28+
flightsql::FlightSQLContext,
29+
local::ExecutionContext,
2730
};
2831
use router::create_router;
2932
use tokio::{net::TcpListener, signal};
@@ -112,7 +115,28 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
112115
if cli.run_ddl {
113116
execution_ctx.execute_ddl().await;
114117
}
115-
let app_execution = AppExecution::new(execution_ctx);
118+
let mut app_execution = AppExecution::new(execution_ctx);
119+
#[cfg(feature = "flightsql")]
120+
{
121+
info!("Setting up FlightSQLContext");
122+
let auth = AuthConfig {
123+
basic_auth: config.flightsql_client.auth.basic_auth.clone(),
124+
bearer_token: config.flightsql_client.auth.bearer_token.clone(),
125+
};
126+
let flightsql_cfg = FlightSQLConfig::new(
127+
config.flightsql_client.connection_url.clone(),
128+
config.flightsql_client.benchmark_iterations,
129+
auth,
130+
);
131+
132+
let flightsql_context = FlightSQLContext::new(flightsql_cfg.clone());
133+
// TODO - Consider adding flag to allow startup even if FlightSQL initiation fails
134+
flightsql_context
135+
.create_client(Some(flightsql_cfg.connection_url))
136+
.await?;
137+
app_execution.with_flightsql_ctx(flightsql_context);
138+
}
139+
info!("Created AppExecution: {app_execution:?}");
116140
let app = HttpApp::try_new(
117141
app_execution,
118142
config.clone(),

src/server/http/router.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ struct PostSqlBody {
7878
}
7979

8080
async fn post_sql_handler(state: State<ExecutionState>, Json(body): Json<PostSqlBody>) -> Response {
81+
if body.flightsql && !cfg!(feature = "flightsql") {
82+
return (
83+
StatusCode::BAD_REQUEST,
84+
"FlightSQL is not enabled on this server",
85+
)
86+
.into_response();
87+
}
8188
let opts = ExecOptions::new(Some(state.config.result_limit), body.flightsql);
8289
execute_sql_with_opts(state, body.query, opts).await
8390
}

0 commit comments

Comments
 (0)