Skip to content

Commit 040233d

Browse files
More iterate
1 parent 5b58dc5 commit 040233d

1 file changed

Lines changed: 83 additions & 15 deletions

File tree

crates/datafusion-app/src/stats.rs

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct ExecutionStats {
5050
compute: Option<ExecutionComputeStats>,
5151
plan: Arc<dyn ExecutionPlan>,
5252
/// Maps operator name to (parent_name, child_index)
53+
#[allow(dead_code)]
54+
// TODO: Use for populating operator_parent/operator_index in to_metrics_table
5355
operator_hierarchy: HashMap<String, (Option<String>, i32)>,
5456
}
5557

@@ -446,6 +448,10 @@ pub struct ExecutionComputeStats {
446448
sort_compute: Option<Vec<PartitionsComputeStats>>,
447449
join_compute: Option<Vec<PartitionsComputeStats>>,
448450
aggregate_compute: Option<Vec<PartitionsComputeStats>>,
451+
window_compute: Option<Vec<PartitionsComputeStats>>,
452+
distinct_compute: Option<Vec<PartitionsComputeStats>>,
453+
limit_compute: Option<Vec<PartitionsComputeStats>>,
454+
union_compute: Option<Vec<PartitionsComputeStats>>,
449455
other_compute: Option<Vec<PartitionsComputeStats>>,
450456
}
451457

@@ -513,8 +519,7 @@ impl std::fmt::Display for ExecutionComputeStats {
513519
)?;
514520
writeln!(f)?;
515521

516-
// Always display all categories in the same order as FlightSQL protocol:
517-
// Projection, Filter, Sort, Aggregate, Join, Other
522+
// Display all categories in order
518523
self.display_compute(f, &self.projection_compute, "Projection")?;
519524
writeln!(f)?;
520525
self.display_compute(f, &self.filter_compute, "Filter")?;
@@ -525,6 +530,14 @@ impl std::fmt::Display for ExecutionComputeStats {
525530
writeln!(f)?;
526531
self.display_compute(f, &self.join_compute, "Join")?;
527532
writeln!(f)?;
533+
self.display_compute(f, &self.window_compute, "Window")?;
534+
writeln!(f)?;
535+
self.display_compute(f, &self.distinct_compute, "Distinct")?;
536+
writeln!(f)?;
537+
self.display_compute(f, &self.limit_compute, "Limit")?;
538+
writeln!(f)?;
539+
self.display_compute(f, &self.union_compute, "Union")?;
540+
writeln!(f)?;
528541
self.display_compute(f, &self.other_compute, "Other")?;
529542
writeln!(f)
530543
}
@@ -732,6 +745,10 @@ impl From<PlanComputeVisitor> for ExecutionComputeStats {
732745
projection_compute: Some(value.projection_computes),
733746
join_compute: Some(value.join_computes),
734747
aggregate_compute: Some(value.aggregate_computes),
748+
window_compute: None, // TODO: Collect from visitor
749+
distinct_compute: None, // TODO: Collect from visitor
750+
limit_compute: None, // TODO: Collect from visitor
751+
union_compute: None, // TODO: Collect from visitor
735752
other_compute: Some(value.other_computes),
736753
}
737754
}
@@ -754,6 +771,7 @@ fn is_io_plan(plan: &dyn ExecutionPlan) -> bool {
754771
}
755772

756773
/// Classify an operator into a category based on its name
774+
#[allow(dead_code)] // TODO: Use in compute visitor for better categorization
757775
fn classify_operator_category(operator_name: &str) -> &'static str {
758776
// Check for specific operator types
759777
if operator_name.contains("Filter") {
@@ -790,8 +808,15 @@ fn classify_operator_category(operator_name: &str) -> &'static str {
790808
"other"
791809
}
792810

811+
#[allow(dead_code)] // Used by classify_operator_category
793812
fn is_io_plan_by_name(name: &str) -> bool {
794-
let io_plans = ["CsvExec", "ParquetExec", "ArrowExec", "JsonExec", "AvroExec"];
813+
let io_plans = [
814+
"CsvExec",
815+
"ParquetExec",
816+
"ArrowExec",
817+
"JsonExec",
818+
"AvroExec",
819+
];
795820
io_plans.iter().any(|p| name.contains(p))
796821
}
797822

@@ -812,11 +837,8 @@ fn collect_operator_hierarchy(
812837

813838
// Recursively collect from children
814839
for (child_index, child) in plan.children().iter().enumerate() {
815-
let child_hierarchy = collect_operator_hierarchy(
816-
child,
817-
Some(operator_name.clone()),
818-
child_index as i32,
819-
);
840+
let child_hierarchy =
841+
collect_operator_hierarchy(child, Some(operator_name.clone()), child_index as i32);
820842
hierarchy.extend(child_hierarchy);
821843
}
822844

@@ -900,7 +922,8 @@ impl MetricsTableBuilder {
900922
self.partition_ids.push(partition_id);
901923
self.operator_categories
902924
.push(operator_category.map(String::from));
903-
self.operator_parents.push(operator_parent.map(String::from));
925+
self.operator_parents
926+
.push(operator_parent.map(String::from));
904927
self.operator_indices.push(operator_index);
905928
}
906929

@@ -912,8 +935,7 @@ impl MetricsTableBuilder {
912935
let partition_ids_array: ArrayRef = Arc::new(Int32Array::from(self.partition_ids));
913936
let operator_categories_array: ArrayRef =
914937
Arc::new(StringArray::from(self.operator_categories));
915-
let operator_parents_array: ArrayRef =
916-
Arc::new(StringArray::from(self.operator_parents));
938+
let operator_parents_array: ArrayRef = Arc::new(StringArray::from(self.operator_parents));
917939
let operator_indices_array: ArrayRef = Arc::new(Int32Array::from(self.operator_indices));
918940

919941
Ok(RecordBatch::try_new(
@@ -939,9 +961,36 @@ impl ExecutionStats {
939961
let mut rows = MetricsTableBuilder::new();
940962

941963
// Add basic metrics with namespacing
942-
rows.add("query.rows", self.rows as u64, "count", None, None, None, None, None);
943-
rows.add("query.batches", self.batches as u64, "count", None, None, None, None, None);
944-
rows.add("query.bytes", self.bytes as u64, "bytes", None, None, None, None, None);
964+
rows.add(
965+
"query.rows",
966+
self.rows as u64,
967+
"count",
968+
None,
969+
None,
970+
None,
971+
None,
972+
None,
973+
);
974+
rows.add(
975+
"query.batches",
976+
self.batches as u64,
977+
"count",
978+
None,
979+
None,
980+
None,
981+
None,
982+
None,
983+
);
984+
rows.add(
985+
"query.bytes",
986+
self.bytes as u64,
987+
"bytes",
988+
None,
989+
None,
990+
None,
991+
None,
992+
None,
993+
);
945994

946995
// Add duration metrics with namespacing
947996
rows.add(
@@ -1418,7 +1467,10 @@ impl ExecutionIOStats {
14181467

14191468
// Helper to get metric value, trying both namespaced and legacy names
14201469
let get_metric = |namespaced: &str, legacy: &str| -> Option<u64> {
1421-
metrics.get(namespaced).or_else(|| metrics.get(legacy)).copied()
1470+
metrics
1471+
.get(namespaced)
1472+
.or_else(|| metrics.get(legacy))
1473+
.copied()
14221474
};
14231475

14241476
Ok(Self {
@@ -1514,6 +1566,22 @@ impl ExecutionComputeStats {
15141566
.get("aggregate")
15151567
.map(|m| to_partition_stats(m))
15161568
.filter(|v| !v.is_empty()),
1569+
window_compute: metrics
1570+
.get("window")
1571+
.map(|m| to_partition_stats(m))
1572+
.filter(|v| !v.is_empty()),
1573+
distinct_compute: metrics
1574+
.get("distinct")
1575+
.map(|m| to_partition_stats(m))
1576+
.filter(|v| !v.is_empty()),
1577+
limit_compute: metrics
1578+
.get("limit")
1579+
.map(|m| to_partition_stats(m))
1580+
.filter(|v| !v.is_empty()),
1581+
union_compute: metrics
1582+
.get("union")
1583+
.map(|m| to_partition_stats(m))
1584+
.filter(|v| !v.is_empty()),
15171585
other_compute: metrics
15181586
.get("other")
15191587
.map(|m| to_partition_stats(m))

0 commit comments

Comments
 (0)