Skip to content

Commit 10d97aa

Browse files
Add execution opts
1 parent 55bd6dc commit 10d97aa

3 files changed

Lines changed: 111 additions & 52 deletions

File tree

crates/datafusion-app/src/local.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use log::{debug, error, info};
2828

2929
use crate::config::ExecutionConfig;
3030
use color_eyre::eyre::{self, Result};
31+
use datafusion::common::Result as DFResult;
3132
use datafusion::execution::{SendableRecordBatchStream, SessionState};
3233
use datafusion::physical_plan::{execute_stream, ExecutionPlan};
3334
use datafusion::prelude::*;
@@ -399,7 +400,35 @@ impl ExecutionContext {
399400
} else {
400401
Err(eyre::eyre!("Only a single statement can be benchmarked"))
401402
}
403+
}
404+
405+
pub async fn execute_sql_with_opts(
406+
&self,
407+
sql: &str,
408+
opts: ExecutionOptions,
409+
) -> DFResult<ExecutionResult> {
410+
let df = self.session_ctx.sql(sql).await?;
411+
let df = if let Some(limit) = opts.limit {
412+
df.limit(0, Some(limit))?
413+
} else {
414+
df
415+
};
416+
Ok(ExecutionResult::RecordBatchStream(
417+
df.execute_stream().await,
418+
))
419+
}
420+
}
402421

403-
// Ok(())
422+
pub struct ExecutionOptions {
423+
limit: Option<usize>,
424+
}
425+
426+
impl ExecutionOptions {
427+
pub fn new(limit: Option<usize>) -> Self {
428+
Self { limit }
404429
}
405430
}
431+
432+
pub enum ExecutionResult {
433+
RecordBatchStream(DFResult<SendableRecordBatchStream>),
434+
}

src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ pub struct HttpServerConfig {
137137
pub auth: AuthConfig,
138138
#[serde(default = "default_timeout_seconds")]
139139
pub timeout_seconds: u64,
140+
#[serde(default = "default_result_limit")]
141+
pub result_limit: usize,
140142
}
141143

142144
#[cfg(feature = "http")]
@@ -148,6 +150,7 @@ impl Default for HttpServerConfig {
148150
server_metrics_port: default_server_metrics_port(),
149151
auth: default_auth_config(),
150152
timeout_seconds: default_timeout_seconds(),
153+
result_limit: default_result_limit(),
151154
}
152155
}
153156
}
@@ -249,6 +252,11 @@ fn default_timeout_seconds() -> u64 {
249252
10
250253
}
251254

255+
#[cfg(feature = "http")]
256+
fn default_result_limit() -> usize {
257+
1000
258+
}
259+
252260
pub fn create_config(config_path: PathBuf) -> AppConfig {
253261
if config_path.exists() {
254262
debug!("Config exists");

src/server/http/router.rs

Lines changed: 73 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use axum::{
2424
routing::{get, post},
2525
Router,
2626
};
27-
use datafusion::arrow::json::ArrayWriter;
28-
use datafusion_app::local::ExecutionContext;
27+
use datafusion::{arrow::json::ArrayWriter, execution::SendableRecordBatchStream};
28+
use datafusion_app::local::{ExecutionContext, ExecutionOptions, ExecutionResult};
2929
use http::{HeaderValue, StatusCode};
3030
use log::error;
3131
use serde::Deserialize;
@@ -35,11 +35,24 @@ use tracing::info;
3535

3636
use crate::config::HttpServerConfig;
3737

38+
#[derive(Clone)]
39+
struct ExecutionState {
40+
execution: ExecutionContext,
41+
config: HttpServerConfig,
42+
}
43+
44+
impl ExecutionState {
45+
pub fn new(execution: ExecutionContext, config: HttpServerConfig) -> Self {
46+
Self { execution, config }
47+
}
48+
}
49+
3850
pub fn create_router(execution: ExecutionContext, config: HttpServerConfig) -> Router {
51+
let state = ExecutionState::new(execution, config);
3952
Router::new()
4053
.route(
4154
"/",
42-
get(|State(_): State<ExecutionContext>| async { "Hello, from DFT!" }),
55+
get(|State(_): State<ExecutionState>| async { "Hello, from DFT!" }),
4356
)
4457
.route("/sql", post(post_sql_handler))
4558
.route("/catalog", get(get_catalog_handler))
@@ -48,17 +61,19 @@ pub fn create_router(execution: ExecutionContext, config: HttpServerConfig) -> R
4861
TraceLayer::new_for_http(),
4962
// Graceful shutdown will wait for outstanding requests to complete. Add a timeout so
5063
// requests don't hang forever.
51-
TimeoutLayer::new(Duration::from_secs(config.timeout_seconds)),
64+
TimeoutLayer::new(Duration::from_secs(state.config.timeout_seconds)),
5265
))
53-
.with_state(execution)
66+
.with_state(state)
5467
}
5568

56-
async fn post_sql_handler(state: State<ExecutionContext>, query: String) -> Response {
57-
execute_sql(state, query).await
69+
async fn post_sql_handler(state: State<ExecutionState>, query: String) -> Response {
70+
let opts = ExecutionOptions::new(Some(state.config.result_limit));
71+
execute_sql_with_opts(state, query, opts).await
5872
}
5973

60-
async fn get_catalog_handler(state: State<ExecutionContext>) -> Response {
61-
execute_sql(state, "SHOW TABLES".to_string()).await
74+
async fn get_catalog_handler(state: State<ExecutionState>) -> Response {
75+
let opts = ExecutionOptions::new(None);
76+
execute_sql_with_opts(state, "SHOW TABLES".to_string(), opts).await
6277
}
6378

6479
#[derive(Deserialize)]
@@ -69,7 +84,7 @@ struct GetTableParams {
6984
}
7085

7186
async fn get_table_handler(
72-
state: State<ExecutionContext>,
87+
state: State<ExecutionState>,
7388
Path(params): Path<GetTableParams>,
7489
) -> Response {
7590
let GetTableParams {
@@ -78,52 +93,22 @@ async fn get_table_handler(
7893
table,
7994
} = params;
8095
let sql = format!("SELECT * FROM \"{catalog}\".\"{schema}\".\"{table}\"");
81-
execute_sql(state, sql).await
96+
let opts = ExecutionOptions::new(Some(state.config.result_limit));
97+
execute_sql_with_opts(state, sql, opts).await
8298
}
8399

84-
async fn execute_sql(State(state): State<ExecutionContext>, sql: String) -> Response {
100+
async fn execute_sql_with_opts(
101+
State(state): State<ExecutionState>,
102+
sql: String,
103+
opts: ExecutionOptions,
104+
) -> Response {
85105
info!("Executing sql: {sql}");
86-
let results = state.execute_sql(&sql).await;
106+
let results = state.execution.execute_sql_with_opts(&sql, opts).await;
87107
match results {
88-
Ok(mut batch_stream) => {
89-
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
90-
let mut writer = ArrayWriter::new(&mut buf);
91-
92-
while let Some(maybe_batch) = batch_stream.next().await {
93-
match maybe_batch {
94-
Ok(batch) => {
95-
if let Err(e) = writer.write(&batch) {
96-
error!("Error serializing result batches: {}", e);
97-
return (StatusCode::INTERNAL_SERVER_ERROR, "Serialization error")
98-
.into_response();
99-
}
100-
}
101-
Err(e) => {
102-
error!("Error executing query: {}", e);
103-
return (StatusCode::INTERNAL_SERVER_ERROR, "Query execution error")
104-
.into_response();
105-
}
106-
}
107-
}
108-
109-
if let Err(e) = writer.finish() {
110-
error!("Error finalizing JSON writer: {}", e);
111-
return (StatusCode::INTERNAL_SERVER_ERROR, "Finalization error").into_response();
112-
}
113-
114-
match String::from_utf8(buf.into_inner()) {
115-
Ok(json) => {
116-
let mut res = Response::new(Body::new(json));
117-
res.headers_mut()
118-
.insert("content-type", HeaderValue::from_static("application/json"));
119-
res
120-
}
121-
Err(_) => {
122-
(StatusCode::INTERNAL_SERVER_ERROR, "UTF-8 conversion error").into_response()
123-
}
124-
}
108+
Ok(ExecutionResult::RecordBatchStream(Ok(batch_stream))) => {
109+
batch_stream_to_response(batch_stream).await
125110
}
126-
Err(e) => {
111+
Err(e) | Ok(ExecutionResult::RecordBatchStream(Err(e))) => {
127112
error!("Error executing SQL: {}", e);
128113
(
129114
StatusCode::BAD_REQUEST,
@@ -133,3 +118,40 @@ async fn execute_sql(State(state): State<ExecutionContext>, sql: String) -> Resp
133118
}
134119
}
135120
}
121+
122+
async fn batch_stream_to_response(batch_stream: SendableRecordBatchStream) -> Response {
123+
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
124+
let mut writer = ArrayWriter::new(&mut buf);
125+
let mut batch_stream = batch_stream;
126+
while let Some(maybe_batch) = batch_stream.next().await {
127+
match maybe_batch {
128+
Ok(batch) => {
129+
if let Err(e) = writer.write(&batch) {
130+
error!("Error serializing result batches: {}", e);
131+
return (StatusCode::INTERNAL_SERVER_ERROR, "Serialization error")
132+
.into_response();
133+
}
134+
}
135+
Err(e) => {
136+
error!("Error executing query: {}", e);
137+
return (StatusCode::INTERNAL_SERVER_ERROR, "Query execution error")
138+
.into_response();
139+
}
140+
}
141+
}
142+
143+
if let Err(e) = writer.finish() {
144+
error!("Error finalizing JSON writer: {}", e);
145+
return (StatusCode::INTERNAL_SERVER_ERROR, "Finalization error").into_response();
146+
}
147+
148+
match String::from_utf8(buf.into_inner()) {
149+
Ok(json) => {
150+
let mut res = Response::new(Body::new(json));
151+
res.headers_mut()
152+
.insert("content-type", HeaderValue::from_static("application/json"));
153+
res
154+
}
155+
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "UTF-8 conversion error").into_response(),
156+
}
157+
}

0 commit comments

Comments
 (0)