Skip to content

Commit 4fca065

Browse files
Json
1 parent 040233d commit 4fca065

7 files changed

Lines changed: 119 additions & 25 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pin-project-lite = { version = "0.2.14" }
4141
prost = "0.14"
4242
ratatui = { optional = true, version = "0.28.0" }
4343
serde = { features = ["derive"], version = "1.0.197" }
44+
serde_json = "1.0"
4445
strum = "0.26.2"
4546
tokio = { features = [
4647
"macros",

crates/datafusion-app/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ opendal = { features = [
3737
parking_lot = "0.12.3"
3838
prost = { optional = true, version = "0.14" }
3939
serde = { features = ["derive"], version = "1.0.197" }
40+
serde_json = "1.0"
4041
tokio = { features = ["macros", "rt-multi-thread"], version = "1.36.0" }
4142
tokio-metrics = { features = [
4243
"metrics-rs-integration",

crates/datafusion-app/src/flightsql.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,10 +506,14 @@ impl FlightSQLContext {
506506
return Err(eyre::eyre!("Only a single SQL statement can be analyzed"));
507507
}
508508

509-
// 1. Create Action with type "analyze_query" and SQL in body
509+
// 1. Create JSON request and encode as Action body
510+
let request = crate::stats::AnalyzeQueryRequest::with_sql(query);
511+
let request_body = serde_json::to_vec(&request)
512+
.map_err(|e| eyre::eyre!("Failed to serialize request: {}", e))?;
513+
510514
let action = Action {
511515
r#type: "analyze_query".to_string(),
512-
body: query.as_bytes().to_vec().into(),
516+
body: request_body.into(),
513517
};
514518

515519
// 2. Call do_action on the FlightSQL service

crates/datafusion-app/src/stats.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,40 @@ use datafusion::{
3737
};
3838
use itertools::Itertools;
3939
use log::debug;
40+
use serde::{Deserialize, Serialize};
4041
use std::{collections::HashMap, sync::Arc, time::Duration};
4142

43+
/// Request structure for the analyze_query action
44+
#[derive(Debug, Clone, Serialize, Deserialize)]
45+
pub struct AnalyzeQueryRequest {
46+
/// SQL query to analyze (currently the only supported format)
47+
pub sql: Option<String>,
48+
49+
// Future extensibility fields (not yet implemented):
50+
// /// Substrait query plan (binary or JSON)
51+
// pub substrait: Option<Vec<u8>>,
52+
// /// Serialized logical plan
53+
// pub logical_plan: Option<String>,
54+
// /// Serialized physical plan
55+
// pub physical_plan: Option<String>,
56+
}
57+
58+
impl AnalyzeQueryRequest {
59+
/// Create a new request with a SQL query
60+
pub fn with_sql(sql: impl Into<String>) -> Self {
61+
Self {
62+
sql: Some(sql.into()),
63+
}
64+
}
65+
66+
/// Get the SQL query, returning an error if not present
67+
pub fn sql(&self) -> color_eyre::Result<&str> {
68+
self.sql
69+
.as_deref()
70+
.ok_or_else(|| color_eyre::eyre::eyre!("sql field is required"))
71+
}
72+
}
73+
4274
#[derive(Clone, Debug)]
4375
pub struct ExecutionStats {
4476
query: String,

docs/arrow_flight_analyze_protocol.md

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,42 @@ The examples in this specification use SQL for illustration, but the protocol wo
3737

3838
### Request Format
3939

40-
**Request Body**: UTF-8 encoded SQL query string
40+
**Request Body**: JSON-encoded query request structure
4141

42-
**Important**: The SQL query must contain exactly one SQL statement. Multiple statements (e.g., separated by semicolons) are not supported and will result in an error.
42+
The request body should be a JSON object with the following structure:
43+
44+
```json
45+
{
46+
"sql": "SELECT * FROM table WHERE id > 100"
47+
}
48+
```
49+
50+
**Current Fields**:
51+
- `sql` (string, required): The SQL query to analyze. Must contain exactly one SQL statement. Multiple statements (e.g., separated by semicolons) are not supported and will result in an error.
52+
53+
**Future Extensibility**:
54+
The protocol is designed to be extensible. Future versions may support additional query representation fields:
55+
- `substrait` (bytes): Substrait query plan (binary or JSON)
56+
- `logical_plan` (string): Serialized logical plan
57+
- `physical_plan` (string): Serialized physical plan
58+
59+
Servers should ignore unknown fields and clients should only send one query representation field at a time.
4360

4461
**Example**:
4562
```rust
63+
use serde_json::json;
64+
65+
let request_body = json!({
66+
"sql": "SELECT * FROM table WHERE id > 100"
67+
});
68+
4669
Action {
4770
r#type: "analyze_query".to_string(),
48-
body: "SELECT * FROM table WHERE id > 100".as_bytes().to_vec().into()
71+
body: serde_json::to_vec(&request_body)?.into()
4972
}
5073
```
5174

52-
**Request Encoding**: The SQL query string should be encoded as UTF-8 bytes in the `Action.body` field.
75+
**Request Encoding**: The JSON object should be serialized to UTF-8 bytes in the `Action.body` field.
5376

5477
### Response Format
5578

@@ -306,29 +329,46 @@ To implement this protocol in an Arrow Flight service:
306329

307330
**Pseudo-code**:
308331
```rust
332+
use serde::{Deserialize, Serialize};
333+
334+
#[derive(Debug, Deserialize, Serialize)]
335+
struct AnalyzeQueryRequest {
336+
sql: Option<String>,
337+
// Future fields:
338+
// substrait: Option<Vec<u8>>,
339+
// logical_plan: Option<String>,
340+
// physical_plan: Option<String>,
341+
}
342+
309343
async fn do_action_fallback(&self, request: Request<Action>) -> Result<Response<Stream>> {
310344
let action = request.into_inner();
311345

312346
if action.r#type == "analyze_query" {
313-
// 1. Extract query
314-
let query = String::from_utf8(action.body.to_vec())?;
347+
// 1. Parse JSON request
348+
let request: AnalyzeQueryRequest = serde_json::from_slice(&action.body)?;
349+
350+
// 2. Extract SQL query (only supported format for now)
351+
let query = request.sql.ok_or_else(||
352+
Status::invalid_argument("sql field is required")
353+
)?;
315354

316-
// 2. Execute with metrics
355+
// 3. Execute with metrics
317356
let stats = self.analyze_query(&query).await?;
318357

319-
// 3. Convert to metrics batch (8-field schema with namespaced metrics)
358+
// 4. Convert to metrics batch (8-field schema with namespaced metrics)
320359
let metrics_batch = stats.to_metrics_table()?;
321360

322-
// 4. Encode as FlightData (no metadata needed)
361+
// 5. Encode as FlightData (no metadata needed)
323362
let flight_data = batches_to_flight_data(&metrics_batch.schema(), vec![metrics_batch])?;
324363

325-
// 5. Serialize and wrap in Result messages
364+
// 6. Serialize and wrap in Result messages
365+
// Note: Query is NOT included in response; clients must retain the original request
326366
let results: Vec<arrow_flight::Result> = flight_data
327367
.into_iter()
328368
.map(|fd| arrow_flight::Result { body: fd.encode_to_vec().into() })
329369
.collect();
330370

331-
// 6. Return stream
371+
// 7. Return stream
332372
Ok(Response::new(stream::iter(results.into_iter().map(Ok))))
333373
} else {
334374
Err(Status::unimplemented("Unknown action"))
@@ -342,10 +382,18 @@ To consume this protocol:
342382

343383
1. **Send Request**
344384
```rust
385+
use serde_json::json;
386+
345387
let query = "SELECT * FROM table WHERE id > 100";
388+
389+
// Construct JSON request
390+
let request_body = json!({
391+
"sql": query
392+
});
393+
346394
let action = Action {
347395
r#type: "analyze_query".to_string(),
348-
body: query.as_bytes().to_vec().into(),
396+
body: serde_json::to_vec(&request_body)?.into(),
349397
};
350398
let stream = client.do_action(action).await?;
351399
```

src/server/flightsql/service.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -850,34 +850,41 @@ impl FlightSqlService for FlightSqlServiceImpl {
850850

851851
match action.r#type.as_str() {
852852
"analyze_query" => {
853-
// 1. Extract SQL query from action.body
854-
let sql = String::from_utf8(action.body.to_vec())
855-
.map_err(|e| Status::invalid_argument(format!("Invalid UTF-8: {}", e)))?;
853+
// 1. Parse JSON request body
854+
let request: datafusion_app::stats::AnalyzeQueryRequest =
855+
serde_json::from_slice(&action.body).map_err(|e| {
856+
Status::invalid_argument(format!("Invalid JSON request: {}", e))
857+
})?;
858+
859+
// 2. Extract SQL query (only supported format for now)
860+
let sql = request
861+
.sql()
862+
.map_err(|e| Status::invalid_argument(e.to_string()))?;
856863

857864
info!("Analyzing query via do_action: {}", sql);
858865

859-
// 2. Execute analyze_query on ExecutionContext
866+
// 3. Execute analyze_query on ExecutionContext
860867
let mut stats = self
861868
.execution
862-
.analyze_query(&sql)
869+
.analyze_query(sql)
863870
.await
864871
.map_err(|e| Status::internal(format!("Analyze failed: {}", e)))?;
865872

866873
stats.collect_stats(); // Collect IO and compute metrics from plan
867874

868-
// 3. Convert ExecutionStats to metrics table format
875+
// 4. Convert ExecutionStats to metrics table format
869876
let metrics_batch = stats.to_metrics_table().map_err(|e| {
870877
Status::internal(format!("Metrics serialization failed: {}", e))
871878
})?;
872879

873-
// 4. Encode metrics batch as FlightData
880+
// 5. Encode metrics batch as FlightData
874881
let flight_data =
875882
batches_to_flight_data(&metrics_batch.schema(), vec![metrics_batch]).map_err(
876883
|e| Status::internal(format!("Failed to encode metrics batch: {}", e)),
877884
)?;
878885

879-
// 5. Convert FlightData to arrow_flight::Result messages
880-
// Note: SQL query is NOT included in metadata; clients must retain the original query
886+
// 6. Convert FlightData to arrow_flight::Result messages
887+
// Note: Query is NOT included in response; clients must retain the original request
881888
let results: Vec<arrow_flight::Result> = flight_data
882889
.into_iter()
883890
.map(|fd| {
@@ -887,7 +894,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
887894
})
888895
.collect();
889896

890-
// 6. Create stream of Result messages
897+
// 7. Create stream of Result messages
891898
let stream = futures::stream::iter(results.into_iter().map(Ok)).boxed();
892899

893900
// Record metrics
@@ -899,7 +906,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
899906
let req = ObservabilityRequestDetails {
900907
request_id: None,
901908
path: "/do_action/analyze_query".to_string(),
902-
sql: Some(sql),
909+
sql: Some(sql.to_string()),
903910
start_ms: start.as_millisecond(),
904911
duration_ms: duration.get_milliseconds(),
905912
rows: None,

0 commit comments

Comments
 (0)