Skip to content

Commit aa24505

Browse files
Cleanup
1 parent 185d136 commit aa24505

9 files changed

Lines changed: 676 additions & 155 deletions

File tree

.claude/settings.local.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"Bash(/Users/matth/OpenSource/datafusion-tui/target/debug/dft:*)",
3333
"Bash(./target/debug/dft:*)",
3434
"Bash(xargs rg:*)",
35-
"Bash(rg:*)"
35+
"Bash(rg:*)",
36+
"Bash(pkill:*)"
3637
],
3738
"deny": [],
3839
"ask": []

crates/datafusion-app/src/flightsql.rs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -473,42 +473,39 @@ impl FlightSQLContext {
473473
}
474474
}
475475

476-
/// Get raw metrics batches without reconstruction (for --analyze-raw)
476+
/// Get raw metrics batch without reconstruction (for --analyze-raw)
477477
pub async fn analyze_query_raw(
478478
&self,
479479
query: &str,
480-
) -> Result<(datafusion::arrow::array::RecordBatch, datafusion::arrow::array::RecordBatch)> {
480+
) -> Result<(String, datafusion::arrow::array::RecordBatch)> {
481481
self.fetch_analyze_batches(query).await
482482
}
483483

484484
/// Reconstruct ExecutionStats from metrics (for --analyze)
485485
pub async fn analyze_query(&self, query: &str) -> Result<crate::stats::ExecutionStats> {
486-
use datafusion::arrow::array::StringArray;
487-
488-
let (queries_batch, metrics_batch) = self.fetch_analyze_batches(query).await?;
489-
490-
// Extract query string from queries batch
491-
let query_array = queries_batch
492-
.column(0)
493-
.as_any()
494-
.downcast_ref::<StringArray>()
495-
.ok_or_else(|| eyre::eyre!("Invalid queries batch schema"))?;
496-
let query_str = query_array.value(0).to_string();
486+
let (query_str, metrics_batch) = self.fetch_analyze_batches(query).await?;
497487

498488
// Reconstruct ExecutionStats from metrics table
499489
let stats = crate::stats::ExecutionStats::from_metrics_table(metrics_batch, query_str)?;
500490

501491
Ok(stats)
502492
}
503493

504-
/// Shared logic to fetch analyze batches from server
494+
/// Shared logic to fetch analyze batch and query from server
505495
async fn fetch_analyze_batches(
506496
&self,
507497
query: &str,
508-
) -> Result<(datafusion::arrow::array::RecordBatch, datafusion::arrow::array::RecordBatch)> {
498+
) -> Result<(String, datafusion::arrow::array::RecordBatch)> {
509499
use arrow_flight::utils::flight_data_to_batches;
510500
use arrow_flight::{Action, FlightData};
511501

502+
// Validate that query contains only a single statement
503+
let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
504+
let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
505+
if statements.len() != 1 {
506+
return Err(eyre::eyre!("Only a single SQL statement can be analyzed"));
507+
}
508+
512509
// 1. Create Action with type "analyze_query" and SQL in body
513510
let action = Action {
514511
r#type: "analyze_query".to_string(),
@@ -533,35 +530,46 @@ impl FlightSQLContext {
533530
result_messages.push(result);
534531
}
535532

536-
// 4. Decode each Result message to FlightData
533+
// 4. Decode each Result message to FlightData and extract query from metadata
537534
let mut all_flight_data = Vec::new();
535+
let mut sql_query = None;
538536

539537
for result in result_messages {
540538
// Deserialize the FlightData from the Result.body bytes using prost
541-
// Note: FlightData implements prost::Message
542539
let flight_data = <FlightData as prost::Message>::decode(result.body.as_ref())
543540
.map_err(|e| eyre::eyre!("Failed to decode FlightData: {}", e))?;
544541

542+
// Extract SQL from schema message (first message) metadata
543+
if sql_query.is_none() && !flight_data.app_metadata.is_empty() {
544+
sql_query = Some(
545+
String::from_utf8(flight_data.app_metadata.to_vec())
546+
.map_err(|e| eyre::eyre!("Invalid UTF-8 in metadata: {}", e))?,
547+
);
548+
}
549+
545550
all_flight_data.push(flight_data);
546551
}
547552

548-
// 5. Convert all FlightData to RecordBatches using flight_data_to_batches
549-
let batches = flight_data_to_batches(&all_flight_data)
550-
.map_err(|e| eyre::eyre!("Failed to decode batches: {}", e))?;
551-
552-
// 6. Split batches - first set is queries (1 batch with schema), second set is metrics
553-
// The batches_to_flight_data function generates: [schema, data batch] for each table
554-
// So we expect: [queries_schema, queries_batch, metrics_schema, metrics_batch]
555-
// But flight_data_to_batches returns just the data batches, not schemas
556-
// So we expect: [queries_batch, metrics_batch]
553+
// 5. Validate we got the SQL query in metadata
554+
let query_str = sql_query
555+
.ok_or_else(|| eyre::eyre!("SQL query not found in response metadata"))?;
557556

558-
if batches.len() < 2 {
557+
// 6. Decode metrics batch
558+
// batches_to_flight_data creates [schema, data] for the batch
559+
if all_flight_data.len() < 2 {
559560
return Err(eyre::eyre!(
560-
"Invalid analyze response: expected at least 2 batches, got {}",
561-
batches.len()
561+
"Invalid analyze response: expected at least 2 FlightData messages (schema + data), got {}",
562+
all_flight_data.len()
562563
));
563564
}
564565

565-
Ok((batches[0].clone(), batches[1].clone()))
566+
let metrics_batches = flight_data_to_batches(&all_flight_data)
567+
.map_err(|e| eyre::eyre!("Failed to decode metrics batch: {}", e))?;
568+
569+
if metrics_batches.is_empty() {
570+
return Err(eyre::eyre!("No metrics batch found in response"));
571+
}
572+
573+
Ok((query_str, metrics_batches[0].clone()))
566574
}
567575
}

crates/datafusion-app/src/stats.rs

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -450,39 +450,42 @@ impl ExecutionComputeStats {
450450
compute: &Option<Vec<PartitionsComputeStats>>,
451451
label: &str,
452452
) -> std::fmt::Result {
453-
if let (Some(filter_compute), Some(elapsed_compute)) = (compute, &self.elapsed_compute) {
454-
let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions());
455-
writeln!(
456-
f,
457-
"{label} Stats ({} nodes, {} partitions)",
458-
filter_compute.len(),
459-
partitions
460-
)?;
461-
writeln!(
462-
f,
463-
"{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
464-
"Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)"
465-
)?;
466-
filter_compute.iter().try_for_each(|node| {
467-
let (min, median, mean, max, total) = node.summary_stats();
468-
let total = format!(
469-
"{} ({:.2}%)",
470-
total,
471-
(total as f32 / *elapsed_compute as f32) * 100.0
472-
);
453+
match (compute, &self.elapsed_compute) {
454+
(Some(filter_compute), Some(elapsed_compute)) if !filter_compute.is_empty() => {
455+
let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions());
456+
writeln!(
457+
f,
458+
"{label}: {} nodes, {} partitions",
459+
filter_compute.len(),
460+
partitions
461+
)?;
473462
writeln!(
474463
f,
475464
"{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
476-
format!("{}({})", node.name, node.elapsed_computes.len()),
477-
min,
478-
median,
479-
mean,
480-
max,
481-
total,
482-
)
483-
})
484-
} else {
485-
writeln!(f, "No {label} Stats")
465+
"Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)"
466+
)?;
467+
filter_compute.iter().try_for_each(|node| {
468+
let (min, median, mean, max, total) = node.summary_stats();
469+
let total = format!(
470+
"{} ({:.2}%)",
471+
total,
472+
(total as f32 / *elapsed_compute as f32) * 100.0
473+
);
474+
writeln!(
475+
f,
476+
"{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}",
477+
format!("{}({})", node.name, node.elapsed_computes.len()),
478+
min,
479+
median,
480+
mean,
481+
max,
482+
total,
483+
)
484+
})
485+
}
486+
_ => {
487+
writeln!(f, "{label}: No data")
488+
}
486489
}
487490
}
488491
}
@@ -503,16 +506,19 @@ impl std::fmt::Display for ExecutionComputeStats {
503506
.unwrap_or("None".to_string()),
504507
)?;
505508
writeln!(f)?;
509+
510+
// Always display all categories in the same order as FlightSQL protocol:
511+
// Projection, Filter, Sort, Aggregate, Join, Other
506512
self.display_compute(f, &self.projection_compute, "Projection")?;
507513
writeln!(f)?;
508514
self.display_compute(f, &self.filter_compute, "Filter")?;
509515
writeln!(f)?;
510516
self.display_compute(f, &self.sort_compute, "Sort")?;
511517
writeln!(f)?;
512-
self.display_compute(f, &self.join_compute, "Join")?;
513-
writeln!(f)?;
514518
self.display_compute(f, &self.aggregate_compute, "Aggregate")?;
515519
writeln!(f)?;
520+
self.display_compute(f, &self.join_compute, "Join")?;
521+
writeln!(f)?;
516522
self.display_compute(f, &self.other_compute, "Other")?;
517523
writeln!(f)
518524
}

docs/cli.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ The output from `EXPLAIN ANALYZE` provides a wealth of information on a queries
133133

134134
To help with this the `--analyze` flag can used to generate a summary of the underlying `ExecutionPlan` `MetricSet`s. The summary presents the information in a way that is hopefully easier to understand and easier to draw conclusions on a query's performance.
135135

136+
**Important**: The analyze feature only supports a single SQL statement. If you provide multiple statements (e.g., separated by semicolons) or multiple files/commands, an error will be returned.
137+
136138
This feature is still in it's early stages and is expected to evolve. Once it has gone through enough real world testing and it has been confirmed the metrics make sense documentation will be added on the exact calculations - until then the source will need to be inspected to see the calculations.
137139

138140
### Local Analyze

docs/flightsql_analyze_protocol.md

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ The FlightSQL Analyze Protocol enables clients to retrieve detailed execution me
2727

2828
**Request Body**: UTF-8 encoded SQL query string
2929

30+
**Important**: The SQL query must contain exactly one SQL statement. Multiple statements (e.g., separated by semicolons) are not supported and will result in an error.
31+
3032
**Example**:
3133
```rust
3234
Action {
@@ -39,29 +41,20 @@ Action {
3941

4042
### Response Format
4143

42-
The response is a stream of `arrow_flight::Result` messages. Each `Result.body` contains serialized `FlightData` messages. The stream provides two Arrow RecordBatches:
43-
44-
#### Batch 1: Queries Batch
44+
The response is a stream of `arrow_flight::Result` messages. Each `Result.body` contains serialized `FlightData` messages.
4545

46-
**Purpose**: Contains the analyzed query text
46+
#### Response Metadata
4747

48-
**Schema**:
49-
| Column | Type | Nullable | Description |
50-
|--------|------|----------|-------------|
51-
| query | Utf8 | false | The SQL query that was analyzed |
48+
The first `FlightData` message (schema message) contains the query text in its metadata:
5249

53-
**Cardinality**: Exactly 1 row
50+
**Metadata Key**: `"sql_query"`
51+
**Metadata Value**: UTF-8 encoded SQL query string
5452

55-
**Example**:
56-
```
57-
query
58-
--------------------------------------------------
59-
SELECT * FROM table WHERE id > 100
60-
```
53+
This allows the client to correlate the metrics with the original query without requiring a separate record batch.
6154

62-
#### Batch 2: Metrics Batch
55+
#### Metrics Batch
6356

64-
**Purpose**: Flat table where each row represents a single metric
57+
**Purpose**: Single Arrow RecordBatch containing a flat table where each row represents a single metric
6558

6659
**Schema**:
6760
| Column | Type | Nullable | Description |
@@ -183,11 +176,10 @@ Detailed breakdown by operator and partition:
183176

184177
## Example Response
185178

186-
### Queries Batch
179+
### Response Metadata
187180
```
188-
query
189-
--------------------------------------------------
190-
SELECT * FROM table WHERE id > 100
181+
Metadata in schema FlightData message:
182+
sql_query: "SELECT * FROM table WHERE id > 100"
191183
```
192184

193185
### Metrics Batch
@@ -243,8 +235,9 @@ To implement this protocol in a FlightSQL server:
243235
- Emit one row per metric value
244236

245237
5. **Build Response**
246-
- Create two RecordBatches (queries + metrics)
238+
- Create metrics RecordBatch
247239
- Encode as FlightData using `batches_to_flight_data()` or equivalent
240+
- Add SQL query to the schema FlightData message metadata with key `"sql_query"`
248241
- Serialize each FlightData to bytes (protobuf encoding)
249242
- Wrap each serialized FlightData in `arrow_flight::Result { body: bytes }`
250243
- Stream Result messages to client
@@ -261,18 +254,20 @@ async fn do_action_fallback(&self, request: Request<Action>) -> Result<Response<
261254
// 2. Execute with metrics
262255
let stats = self.analyze_query(&sql).await?;
263256

264-
// 3. Convert to batches
265-
let queries_batch = create_queries_batch(vec![sql])?;
257+
// 3. Convert to metrics batch
266258
let metrics_batch = stats.to_metrics_table()?;
267259

268-
// 4. Encode as FlightData
269-
let queries_flight_data = batches_to_flight_data(&queries_batch.schema(), vec![queries_batch])?;
270-
let metrics_flight_data = batches_to_flight_data(&metrics_batch.schema(), vec![metrics_batch])?;
260+
// 4. Encode as FlightData with SQL in metadata
261+
let mut flight_data = batches_to_flight_data(&metrics_batch.schema(), vec![metrics_batch])?;
262+
263+
// Add SQL query to schema message metadata
264+
if let Some(schema_msg) = flight_data.first_mut() {
265+
schema_msg.app_metadata = sql.as_bytes().to_vec().into();
266+
}
271267

272268
// 5. Serialize and wrap in Result messages
273-
let results: Vec<arrow_flight::Result> = queries_flight_data
269+
let results: Vec<arrow_flight::Result> = flight_data
274270
.into_iter()
275-
.chain(metrics_flight_data.into_iter())
276271
.map(|fd| arrow_flight::Result { body: fd.encode_to_vec().into() })
277272
.collect();
278273

@@ -300,37 +295,46 @@ To consume this protocol:
300295
2. **Receive Stream**
301296
- Collect `arrow_flight::Result` messages from stream
302297

303-
3. **Decode FlightData**
298+
3. **Decode FlightData and Extract Metadata**
304299
```rust
305300
let mut flight_data_vec = Vec::new();
301+
let mut sql_query = None;
302+
306303
for result in result_messages {
307304
let flight_data = FlightData::decode(result.body.as_ref())?;
305+
306+
// Extract SQL from first message (schema) metadata
307+
if sql_query.is_none() && !flight_data.app_metadata.is_empty() {
308+
sql_query = Some(String::from_utf8(flight_data.app_metadata.to_vec())?);
309+
}
310+
308311
flight_data_vec.push(flight_data);
309312
}
310313
```
311314

312-
4. **Convert to RecordBatches**
315+
4. **Convert to RecordBatch**
313316
```rust
314317
let batches = flight_data_to_batches(&flight_data_vec)?;
315-
let queries_batch = batches[0].clone();
316-
let metrics_batch = batches[1].clone();
318+
let metrics_batch = batches[0].clone();
319+
let query_text = sql_query.expect("SQL query not found in metadata");
317320
```
318321

319-
5. **Extract Data**
320-
- Parse queries batch to get SQL text
322+
5. **Reconstruct Statistics**
323+
- Use query text from metadata
321324
- Parse metrics batch to reconstruct execution statistics
322325

323326
### Error Handling
324327

325328
**Server Errors**:
326329
- `Status::unimplemented` - Server doesn't support analyze protocol
327-
- `Status::invalid_argument` - Invalid SQL or malformed request
330+
- `Status::invalid_argument` - Invalid SQL, malformed request, or multiple SQL statements provided
328331
- `Status::internal` - Query execution or serialization failure
329332

330333
**Client Handling**:
331334
- Gracefully handle `unimplemented` with clear user message
332335
- Retry transient errors as appropriate
333-
- Validate response format (expect 2+ batches)
336+
- Validate response format (expect metadata with `sql_query` and at least one batch)
337+
- Handle missing metadata gracefully (older protocol versions)
334338

335339
## Extensibility
336340

0 commit comments

Comments
 (0)