Skip to content

Commit 8d07ef3

Browse files
More compute stats
1 parent d93b83d commit 8d07ef3

1 file changed

Lines changed: 119 additions & 4 deletions

File tree

crates/datafusion-app/src/stats.rs

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ use datafusion::{
2929
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
3030
SymmetricHashJoinExec,
3131
},
32+
limit::{GlobalLimitExec, LocalLimitExec},
3233
metrics::MetricValue,
3334
projection::ProjectionExec,
3435
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
36+
union::UnionExec,
3537
visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
3638
},
3739
};
@@ -638,6 +640,10 @@ pub struct PlanComputeVisitor {
638640
projection_computes: Vec<PartitionsComputeStats>,
639641
join_computes: Vec<PartitionsComputeStats>,
640642
aggregate_computes: Vec<PartitionsComputeStats>,
643+
window_computes: Vec<PartitionsComputeStats>,
644+
distinct_computes: Vec<PartitionsComputeStats>,
645+
limit_computes: Vec<PartitionsComputeStats>,
646+
union_computes: Vec<PartitionsComputeStats>,
641647
other_computes: Vec<PartitionsComputeStats>,
642648
}
643649

@@ -662,6 +668,10 @@ impl PlanComputeVisitor {
662668
self.collect_projection_metrics(plan);
663669
self.collect_join_metrics(plan);
664670
self.collect_aggregate_metrics(plan);
671+
self.collect_window_metrics(plan);
672+
self.collect_distinct_metrics(plan);
673+
self.collect_limit_metrics(plan);
674+
self.collect_union_metrics(plan);
665675
self.collect_other_metrics(plan);
666676
}
667677

@@ -766,12 +776,96 @@ impl PlanComputeVisitor {
766776
}
767777
}
768778

779+
fn collect_window_metrics(&mut self, plan: &dyn ExecutionPlan) {
780+
if is_window_plan(plan) {
781+
if let Some(metrics) = plan.metrics() {
782+
let sorted_computes: Vec<usize> = metrics
783+
.iter()
784+
.filter_map(|m| match m.value() {
785+
MetricValue::ElapsedCompute(t) => Some(t.value()),
786+
_ => None,
787+
})
788+
.sorted()
789+
.collect();
790+
let p = PartitionsComputeStats {
791+
name: plan.name().to_string(),
792+
elapsed_computes: sorted_computes,
793+
};
794+
self.window_computes.push(p)
795+
}
796+
}
797+
}
798+
799+
fn collect_distinct_metrics(&mut self, plan: &dyn ExecutionPlan) {
800+
if is_distinct_plan(plan) {
801+
if let Some(metrics) = plan.metrics() {
802+
let sorted_computes: Vec<usize> = metrics
803+
.iter()
804+
.filter_map(|m| match m.value() {
805+
MetricValue::ElapsedCompute(t) => Some(t.value()),
806+
_ => None,
807+
})
808+
.sorted()
809+
.collect();
810+
let p = PartitionsComputeStats {
811+
name: plan.name().to_string(),
812+
elapsed_computes: sorted_computes,
813+
};
814+
self.distinct_computes.push(p)
815+
}
816+
}
817+
}
818+
819+
fn collect_limit_metrics(&mut self, plan: &dyn ExecutionPlan) {
820+
if is_limit_plan(plan) {
821+
if let Some(metrics) = plan.metrics() {
822+
let sorted_computes: Vec<usize> = metrics
823+
.iter()
824+
.filter_map(|m| match m.value() {
825+
MetricValue::ElapsedCompute(t) => Some(t.value()),
826+
_ => None,
827+
})
828+
.sorted()
829+
.collect();
830+
let p = PartitionsComputeStats {
831+
name: plan.name().to_string(),
832+
elapsed_computes: sorted_computes,
833+
};
834+
self.limit_computes.push(p)
835+
}
836+
}
837+
}
838+
839+
fn collect_union_metrics(&mut self, plan: &dyn ExecutionPlan) {
840+
if is_union_plan(plan) {
841+
if let Some(metrics) = plan.metrics() {
842+
let sorted_computes: Vec<usize> = metrics
843+
.iter()
844+
.filter_map(|m| match m.value() {
845+
MetricValue::ElapsedCompute(t) => Some(t.value()),
846+
_ => None,
847+
})
848+
.sorted()
849+
.collect();
850+
let p = PartitionsComputeStats {
851+
name: plan.name().to_string(),
852+
elapsed_computes: sorted_computes,
853+
};
854+
self.union_computes.push(p)
855+
}
856+
}
857+
}
858+
769859
fn collect_other_metrics(&mut self, plan: &dyn ExecutionPlan) {
770860
if !is_filter_plan(plan)
771861
&& !is_sort_plan(plan)
772862
&& !is_projection_plan(plan)
773863
&& !is_aggregate_plan(plan)
774864
&& !is_join_plan(plan)
865+
&& !is_window_plan(plan)
866+
&& !is_distinct_plan(plan)
867+
&& !is_limit_plan(plan)
868+
&& !is_union_plan(plan)
775869
{
776870
if let Some(metrics) = plan.metrics() {
777871
let sorted_computes: Vec<usize> = metrics
@@ -823,6 +917,27 @@ fn is_aggregate_plan(plan: &dyn ExecutionPlan) -> bool {
823917
plan.as_any().downcast_ref::<AggregateExec>().is_some()
824918
}
825919

920+
fn is_window_plan(plan: &dyn ExecutionPlan) -> bool {
921+
// Check by name since there are multiple window exec types
922+
let name = plan.name();
923+
name.contains("Window")
924+
}
925+
926+
fn is_distinct_plan(plan: &dyn ExecutionPlan) -> bool {
927+
// Check by name since distinct may be handled various ways
928+
let name = plan.name();
929+
name.contains("Distinct") || name.contains("Deduplicate")
930+
}
931+
932+
fn is_limit_plan(plan: &dyn ExecutionPlan) -> bool {
933+
plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
934+
|| plan.as_any().downcast_ref::<LocalLimitExec>().is_some()
935+
}
936+
937+
fn is_union_plan(plan: &dyn ExecutionPlan) -> bool {
938+
plan.as_any().downcast_ref::<UnionExec>().is_some()
939+
}
940+
826941
impl From<PlanComputeVisitor> for ExecutionComputeStats {
827942
fn from(value: PlanComputeVisitor) -> Self {
828943
Self {
@@ -832,10 +947,10 @@ impl From<PlanComputeVisitor> for ExecutionComputeStats {
832947
projection_compute: Some(value.projection_computes),
833948
join_compute: Some(value.join_computes),
834949
aggregate_compute: Some(value.aggregate_computes),
835-
window_compute: None, // TODO: Collect from visitor
836-
distinct_compute: None, // TODO: Collect from visitor
837-
limit_compute: None, // TODO: Collect from visitor
838-
union_compute: None, // TODO: Collect from visitor
950+
window_compute: Some(value.window_computes),
951+
distinct_compute: Some(value.distinct_computes),
952+
limit_compute: Some(value.limit_computes),
953+
union_compute: Some(value.union_computes),
839954
other_compute: Some(value.other_computes),
840955
}
841956
}

0 commit comments

Comments
 (0)