Skip to content

Commit 34a1d3e

Browse files
Cleanup
1 parent 8d07ef3 commit 34a1d3e

2 files changed

Lines changed: 44 additions & 48 deletions

File tree

crates/datafusion-app/src/stats.rs

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ pub struct ExecutionStats {
8383
compute: Option<ExecutionComputeStats>,
8484
plan: Arc<dyn ExecutionPlan>,
8585
/// Maps operator name to (parent_name, child_index)
86-
#[allow(dead_code)]
87-
// TODO: Use for populating operator_parent/operator_index in to_metrics_table
8886
operator_hierarchy: HashMap<String, (Option<String>, i32)>,
8987
}
9088

@@ -1249,13 +1247,19 @@ impl ExecutionStats {
12491247
);
12501248

12511249
// Add IO metrics if present with namespacing
1252-
// TODO: Populate operator_parent and operator_index from execution plan hierarchy
12531250
if let Some(io) = &self.io {
12541251
// Determine the appropriate namespace and operator name based on format type
12551252
let format = io.format_type.unwrap_or(IOFormatType::Unknown);
12561253
let namespace = format.namespace_prefix();
12571254
let operator_name = format.operator_name();
12581255

1256+
// Look up hierarchy info for this operator
1257+
let (parent, index) = self
1258+
.operator_hierarchy
1259+
.get(operator_name)
1260+
.map(|(p, i)| (p.as_deref(), if *i == -1 { None } else { Some(*i) }))
1261+
.unwrap_or((None, None));
1262+
12591263
if let Some(bytes) = &io.bytes_scanned {
12601264
rows.add(
12611265
&format!("{}.bytes_scanned", namespace),
@@ -1264,8 +1268,8 @@ impl ExecutionStats {
12641268
Some(operator_name),
12651269
None,
12661270
Some("io"),
1267-
None, // operator_parent - will be populated with hierarchy collection
1268-
None, // operator_index - will be populated with hierarchy collection
1271+
parent,
1272+
index,
12691273
);
12701274
}
12711275
if let Some(time) = &io.time_opening {
@@ -1276,8 +1280,8 @@ impl ExecutionStats {
12761280
Some(operator_name),
12771281
None,
12781282
Some("io"),
1279-
None,
1280-
None,
1283+
parent,
1284+
index,
12811285
);
12821286
}
12831287
if let Some(time) = &io.time_scanning {
@@ -1288,8 +1292,8 @@ impl ExecutionStats {
12881292
Some(operator_name),
12891293
None,
12901294
Some("io"),
1291-
None,
1292-
None,
1295+
parent,
1296+
index,
12931297
);
12941298
}
12951299

@@ -1303,8 +1307,8 @@ impl ExecutionStats {
13031307
Some(operator_name),
13041308
None,
13051309
Some("io"),
1306-
None,
1307-
None,
1310+
parent,
1311+
index,
13081312
);
13091313
}
13101314
if let Some(pruned) = &io.parquet_rg_pruned_stats {
@@ -1315,8 +1319,8 @@ impl ExecutionStats {
13151319
Some(operator_name),
13161320
None,
13171321
Some("io"),
1318-
None,
1319-
None,
1322+
parent,
1323+
index,
13201324
);
13211325
}
13221326
if let Some(matched) = &io.parquet_rg_matched_stats {
@@ -1327,8 +1331,8 @@ impl ExecutionStats {
13271331
Some(operator_name),
13281332
None,
13291333
Some("io"),
1330-
None,
1331-
None,
1334+
parent,
1335+
index,
13321336
);
13331337
}
13341338
if let Some(pruned) = &io.parquet_rg_pruned_bloom_filter {
@@ -1339,8 +1343,8 @@ impl ExecutionStats {
13391343
Some(operator_name),
13401344
None,
13411345
Some("io"),
1342-
None,
1343-
None,
1346+
parent,
1347+
index,
13441348
);
13451349
}
13461350
if let Some(matched) = &io.parquet_rg_matched_bloom_filter {
@@ -1351,8 +1355,8 @@ impl ExecutionStats {
13511355
Some(operator_name),
13521356
None,
13531357
Some("io"),
1354-
None,
1355-
None,
1358+
parent,
1359+
index,
13561360
);
13571361
}
13581362
if let Some(pruned) = &io.parquet_pruned_page_index {
@@ -1363,8 +1367,8 @@ impl ExecutionStats {
13631367
Some(operator_name),
13641368
None,
13651369
Some("io"),
1366-
None,
1367-
None,
1370+
parent,
1371+
index,
13681372
);
13691373
}
13701374
if let Some(matched) = &io.parquet_matched_page_index {
@@ -1375,15 +1379,14 @@ impl ExecutionStats {
13751379
Some(operator_name),
13761380
None,
13771381
Some("io"),
1378-
None,
1379-
None,
1382+
parent,
1383+
index,
13801384
);
13811385
}
13821386
} // End of Parquet-specific metrics block
13831387
} // End of IO metrics block
13841388

13851389
// Add compute metrics if present with namespacing
1386-
// TODO: Populate operator_parent and operator_index from execution plan hierarchy
13871390
if let Some(compute) = &self.compute {
13881391
if let Some(elapsed) = compute.elapsed_compute {
13891392
rows.add(
@@ -1399,11 +1402,18 @@ impl ExecutionStats {
13991402
}
14001403

14011404
// Helper to add compute metrics for a category
1405+
let hierarchy = &self.operator_hierarchy;
14021406
let add_compute_category = |rows: &mut MetricsTableBuilder,
14031407
compute_stats: &Option<Vec<PartitionsComputeStats>>,
14041408
category: &str| {
14051409
if let Some(stats) = compute_stats {
14061410
for stat in stats {
1411+
// Look up hierarchy info for this operator
1412+
let (parent, index) = hierarchy
1413+
.get(&stat.name)
1414+
.map(|(p, i)| (p.as_deref(), if *i == -1 { None } else { Some(*i) }))
1415+
.unwrap_or((None, None));
1416+
14071417
for (partition_id, elapsed) in stat.elapsed_computes.iter().enumerate() {
14081418
rows.add(
14091419
"compute.elapsed_compute",
@@ -1412,8 +1422,8 @@ impl ExecutionStats {
14121422
Some(&stat.name),
14131423
Some(partition_id as i32),
14141424
Some(category),
1415-
None, // operator_parent - will be populated with hierarchy collection
1416-
None, // operator_index - will be populated with hierarchy collection
1425+
parent,
1426+
index,
14171427
);
14181428
}
14191429
}

docs/arrow_flight_analyze_protocol.md

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ The request body should be a JSON object with the following structure:
5151
- `sql` (string, required): The SQL query to analyze. Must contain exactly one SQL statement. Multiple statements (e.g., separated by semicolons) are not supported and will result in an error.
5252

5353
**Future Extensibility**:
54-
The protocol is designed to be extensible. Future versions may support additional query representation fields:
54+
The protocol is designed to be extensible. Additional query representation fields may be supported in the future:
5555
- `substrait` (bytes): Substrait query plan (binary or JSON)
5656
- `logical_plan` (string): Serialized logical plan
5757
- `physical_plan` (string): Serialized physical plan
@@ -131,8 +131,6 @@ Metric names use a hierarchical namespace structure to prevent collisions and pr
131131

132132
**Important**: There is no generic `io.*` namespace. Each file format reports its own complete set of I/O metrics under its specific namespace (e.g., `io.parquet.*`, `io.csv.*`). This prevents mixing aggregated and raw data.
133133

134-
**Backward Compatibility**: Legacy metric names without namespaces (e.g., `rows` instead of `query.rows`) may be supported by implementations for transition purposes, but new implementations should use namespaced names.
135-
136134
## Standard Metrics
137135

138136
### Query-Level Metrics
@@ -420,21 +418,23 @@ To consume this protocol:
420418
5. **Reconstruct Statistics**
421419
- Use the original query string retained by the client
422420
- Parse metrics batch (8-field schema) to reconstruct execution statistics
423-
- Support both namespaced (e.g., `query.rows`) and legacy (e.g., `rows`) metric names for backward compatibility
424421
- Extract `operator_parent` and `operator_index` to reconstruct execution plan hierarchy if needed
425422

426423
### Error Handling
427424

428-
**Server Errors**:
425+
**Server Behavior**:
426+
Any error during request parsing, query execution, metrics collection, or response serialization results in complete failure. No partial metrics are returned.
427+
428+
**Error Codes**:
429429
- `Status::unimplemented` - Server doesn't support analyze protocol
430430
- `Status::invalid_argument` - Invalid SQL, malformed request, or multiple SQL statements provided
431-
- `Status::internal` - Query execution or serialization failure
431+
- `Status::internal` - Query execution, metrics collection, or serialization failure
432432

433433
**Client Handling**:
434-
- Gracefully handle `unimplemented` with clear user message
434+
- Handle `unimplemented` gracefully with clear user message
435435
- Retry transient errors as appropriate
436436
- Validate response format (expect at least one batch with 8-field schema)
437-
- Fail fast if server sends old 6-field schema (schema version mismatch)
437+
- Any error response means no metrics were collected
438438

439439
## Extensibility
440440

@@ -496,20 +496,6 @@ Clients should handle metrics according to these principles:
496496

497497
6. **Do not fail** on missing optional metrics (format-specific, compute per-partition, etc.)
498498

499-
7. **Support backward compatibility** by accepting both namespaced (`query.rows`) and legacy (`rows`) metric names during transition periods
500-
501-
## Version History
502-
503-
**Version 1.0** (2026-02):
504-
- Initial specification with Stage 1 improvements
505-
- Namespaced metric names (query.*, stage.*, io.{format}.*, compute.*)
506-
- 8-field schema with operator_parent and operator_index for execution plan hierarchy
507-
- No SQL in response metadata (client retains query)
508-
- Extended operator categories: filter, sort, projection, join, aggregate, window, distinct, limit, union, io, other
509-
- Clarified as Arrow Flight extension (not FlightSQL-specific)
510-
- Client guidance to display unknown metrics with valid categories
511-
- Standard metric definitions for Parquet, CSV, JSON formats
512-
513499
## References
514500

515501
- [Apache Arrow Flight SQL Protocol](https://arrow.apache.org/docs/format/FlightSql.html)

0 commit comments

Comments
 (0)