@@ -235,6 +235,7 @@ impl std::fmt::Display for ExecutionDurationStats {
235235
236236#[ derive( Clone , Debug ) ]
237237pub struct ExecutionIOStats {
238+ format_type : Option < IOFormatType > ,
238239 bytes_scanned : Option < MetricValue > ,
239240 time_opening : Option < MetricValue > ,
240241 time_scanning : Option < MetricValue > ,
@@ -363,9 +364,42 @@ impl std::fmt::Display for ExecutionIOStats {
363364
364365/// Visitor to collect IO metrics from an execution plan
365366///
367+ /// Represents the file format type for I/O operations
368+ #[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
369+ enum IOFormatType {
370+ Csv ,
371+ Parquet ,
372+ Arrow ,
373+ Json ,
374+ Unknown ,
375+ }
376+
377+ impl IOFormatType {
378+ fn namespace_prefix ( & self ) -> & ' static str {
379+ match self {
380+ IOFormatType :: Csv => "io.csv" ,
381+ IOFormatType :: Parquet => "io.parquet" ,
382+ IOFormatType :: Arrow => "io.arrow" ,
383+ IOFormatType :: Json => "io.json" ,
384+ IOFormatType :: Unknown => "io.unknown" ,
385+ }
386+ }
387+
388+ fn operator_name ( & self ) -> & ' static str {
389+ match self {
390+ IOFormatType :: Csv => "CsvExec" ,
391+ IOFormatType :: Parquet => "ParquetExec" ,
392+ IOFormatType :: Arrow => "ArrowExec" ,
393+ IOFormatType :: Json => "JsonExec" ,
394+ IOFormatType :: Unknown => "UnknownExec" ,
395+ }
396+ }
397+ }
398+
366399/// IO metrics are collected from nodes that perform IO operations, such as
367- /// `CsvExec`, `ParquetExec`, and `ArrowExec `.
400+ /// `CsvExec`, `ParquetExec`, `ArrowExec`, and `JsonExec `.
368401struct PlanIOVisitor {
402+ format_type : Option < IOFormatType > ,
369403 bytes_scanned : Option < MetricValue > ,
370404 time_opening : Option < MetricValue > ,
371405 time_scanning : Option < MetricValue > ,
@@ -381,6 +415,7 @@ struct PlanIOVisitor {
381415impl PlanIOVisitor {
382416 fn new ( ) -> Self {
383417 Self {
418+ format_type : None ,
384419 bytes_scanned : None ,
385420 time_opening : None ,
386421 time_scanning : None ,
@@ -395,6 +430,25 @@ impl PlanIOVisitor {
395430 }
396431
397432 fn collect_io_metrics ( & mut self , plan : & dyn ExecutionPlan ) {
433+ // Determine format type from plan name
434+ let plan_name = plan. name ( ) ;
435+ let format = if plan_name. contains ( "CsvExec" ) {
436+ IOFormatType :: Csv
437+ } else if plan_name. contains ( "ParquetExec" ) {
438+ IOFormatType :: Parquet
439+ } else if plan_name. contains ( "ArrowExec" ) {
440+ IOFormatType :: Arrow
441+ } else if plan_name. contains ( "JsonExec" ) {
442+ IOFormatType :: Json
443+ } else {
444+ IOFormatType :: Unknown
445+ } ;
446+
447+ // Only set format_type if not already set (take first I/O operator encountered)
448+ if self . format_type . is_none ( ) {
449+ self . format_type = Some ( format) ;
450+ }
451+
398452 let io_metrics = plan. metrics ( ) ;
399453 if let Some ( metrics) = io_metrics {
400454 self . bytes_scanned = metrics. sum_by_name ( "bytes_scanned" ) ;
@@ -422,6 +476,7 @@ impl PlanIOVisitor {
422476impl From < PlanIOVisitor > for ExecutionIOStats {
423477 fn from ( value : PlanIOVisitor ) -> Self {
424478 Self {
479+ format_type : value. format_type ,
425480 bytes_scanned : value. bytes_scanned ,
426481 time_opening : value. time_opening ,
427482 time_scanning : value. time_scanning ,
@@ -799,7 +854,7 @@ impl ExecutionPlanVisitor for PlanComputeVisitor {
799854}
800855
801856fn is_io_plan ( plan : & dyn ExecutionPlan ) -> bool {
802- let io_plans = [ "CsvExec" , "ParquetExec" , "ArrowExec" ] ;
857+ let io_plans = [ "CsvExec" , "ParquetExec" , "ArrowExec" , "JsonExec" ] ;
803858 io_plans. contains ( & plan. name ( ) )
804859}
805860
@@ -1082,12 +1137,17 @@ impl ExecutionStats {
10821137 // Add IO metrics if present with namespacing
10831138 // TODO: Populate operator_parent and operator_index from execution plan hierarchy
10841139 if let Some ( io) = & self . io {
1140+ // Determine the appropriate namespace and operator name based on format type
1141+ let format = io. format_type . unwrap_or ( IOFormatType :: Unknown ) ;
1142+ let namespace = format. namespace_prefix ( ) ;
1143+ let operator_name = format. operator_name ( ) ;
1144+
10851145 if let Some ( bytes) = & io. bytes_scanned {
10861146 rows. add (
1087- "io.parquet. bytes_scanned",
1147+ & format ! ( "{}. bytes_scanned", namespace ) ,
10881148 bytes. as_usize ( ) as u64 ,
10891149 "bytes" ,
1090- Some ( "ParquetExec" ) ,
1150+ Some ( operator_name ) ,
10911151 None ,
10921152 Some ( "io" ) ,
10931153 None , // operator_parent - will be populated with hierarchy collection
@@ -1096,10 +1156,10 @@ impl ExecutionStats {
10961156 }
10971157 if let Some ( time) = & io. time_opening {
10981158 rows. add (
1099- "io.parquet. time_opening",
1159+ & format ! ( "{}. time_opening", namespace ) ,
11001160 time. as_usize ( ) as u64 ,
11011161 "duration_ns" ,
1102- Some ( "ParquetExec" ) ,
1162+ Some ( operator_name ) ,
11031163 None ,
11041164 Some ( "io" ) ,
11051165 None ,
@@ -1108,101 +1168,105 @@ impl ExecutionStats {
11081168 }
11091169 if let Some ( time) = & io. time_scanning {
11101170 rows. add (
1111- "io.parquet. time_scanning",
1171+ & format ! ( "{}. time_scanning", namespace ) ,
11121172 time. as_usize ( ) as u64 ,
11131173 "duration_ns" ,
1114- Some ( "ParquetExec" ) ,
1115- None ,
1116- Some ( "io" ) ,
1117- None ,
1118- None ,
1119- ) ;
1120- }
1121- if let Some ( output_rows) = io. parquet_output_rows {
1122- rows. add (
1123- "io.parquet.output_rows" ,
1124- output_rows as u64 ,
1125- "count" ,
1126- Some ( "ParquetExec" ) ,
1127- None ,
1128- Some ( "io" ) ,
1129- None ,
1130- None ,
1131- ) ;
1132- }
1133- if let Some ( pruned) = & io. parquet_rg_pruned_stats {
1134- rows. add (
1135- "io.parquet.rg_pruned" ,
1136- pruned. as_usize ( ) as u64 ,
1137- "count" ,
1138- Some ( "ParquetExec" ) ,
1139- None ,
1140- Some ( "io" ) ,
1141- None ,
1142- None ,
1143- ) ;
1144- }
1145- if let Some ( matched) = & io. parquet_rg_matched_stats {
1146- rows. add (
1147- "io.parquet.rg_matched" ,
1148- matched. as_usize ( ) as u64 ,
1149- "count" ,
1150- Some ( "ParquetExec" ) ,
1151- None ,
1152- Some ( "io" ) ,
1153- None ,
1154- None ,
1155- ) ;
1156- }
1157- if let Some ( pruned) = & io. parquet_rg_pruned_bloom_filter {
1158- rows. add (
1159- "io.parquet.bloom_pruned" ,
1160- pruned. as_usize ( ) as u64 ,
1161- "count" ,
1162- Some ( "ParquetExec" ) ,
1163- None ,
1164- Some ( "io" ) ,
1165- None ,
1166- None ,
1167- ) ;
1168- }
1169- if let Some ( matched) = & io. parquet_rg_matched_bloom_filter {
1170- rows. add (
1171- "io.parquet.bloom_matched" ,
1172- matched. as_usize ( ) as u64 ,
1173- "count" ,
1174- Some ( "ParquetExec" ) ,
1175- None ,
1176- Some ( "io" ) ,
1177- None ,
1178- None ,
1179- ) ;
1180- }
1181- if let Some ( pruned) = & io. parquet_pruned_page_index {
1182- rows. add (
1183- "io.parquet.page_index_pruned" ,
1184- pruned. as_usize ( ) as u64 ,
1185- "count" ,
1186- Some ( "ParquetExec" ) ,
1187- None ,
1188- Some ( "io" ) ,
1189- None ,
1190- None ,
1191- ) ;
1192- }
1193- if let Some ( matched) = & io. parquet_matched_page_index {
1194- rows. add (
1195- "io.parquet.page_index_matched" ,
1196- matched. as_usize ( ) as u64 ,
1197- "count" ,
1198- Some ( "ParquetExec" ) ,
1174+ Some ( operator_name) ,
11991175 None ,
12001176 Some ( "io" ) ,
12011177 None ,
12021178 None ,
12031179 ) ;
12041180 }
1205- }
1181+
1182+ // Parquet-specific metrics (only add if format is Parquet)
1183+ if format == IOFormatType :: Parquet {
1184+ if let Some ( output_rows) = io. parquet_output_rows {
1185+ rows. add (
1186+ & format ! ( "{}.output_rows" , namespace) ,
1187+ output_rows as u64 ,
1188+ "count" ,
1189+ Some ( operator_name) ,
1190+ None ,
1191+ Some ( "io" ) ,
1192+ None ,
1193+ None ,
1194+ ) ;
1195+ }
1196+ if let Some ( pruned) = & io. parquet_rg_pruned_stats {
1197+ rows. add (
1198+ & format ! ( "{}.rg_pruned" , namespace) ,
1199+ pruned. as_usize ( ) as u64 ,
1200+ "count" ,
1201+ Some ( operator_name) ,
1202+ None ,
1203+ Some ( "io" ) ,
1204+ None ,
1205+ None ,
1206+ ) ;
1207+ }
1208+ if let Some ( matched) = & io. parquet_rg_matched_stats {
1209+ rows. add (
1210+ & format ! ( "{}.rg_matched" , namespace) ,
1211+ matched. as_usize ( ) as u64 ,
1212+ "count" ,
1213+ Some ( operator_name) ,
1214+ None ,
1215+ Some ( "io" ) ,
1216+ None ,
1217+ None ,
1218+ ) ;
1219+ }
1220+ if let Some ( pruned) = & io. parquet_rg_pruned_bloom_filter {
1221+ rows. add (
1222+ & format ! ( "{}.bloom_pruned" , namespace) ,
1223+ pruned. as_usize ( ) as u64 ,
1224+ "count" ,
1225+ Some ( operator_name) ,
1226+ None ,
1227+ Some ( "io" ) ,
1228+ None ,
1229+ None ,
1230+ ) ;
1231+ }
1232+ if let Some ( matched) = & io. parquet_rg_matched_bloom_filter {
1233+ rows. add (
1234+ & format ! ( "{}.bloom_matched" , namespace) ,
1235+ matched. as_usize ( ) as u64 ,
1236+ "count" ,
1237+ Some ( operator_name) ,
1238+ None ,
1239+ Some ( "io" ) ,
1240+ None ,
1241+ None ,
1242+ ) ;
1243+ }
1244+ if let Some ( pruned) = & io. parquet_pruned_page_index {
1245+ rows. add (
1246+ & format ! ( "{}.page_index_pruned" , namespace) ,
1247+ pruned. as_usize ( ) as u64 ,
1248+ "count" ,
1249+ Some ( operator_name) ,
1250+ None ,
1251+ Some ( "io" ) ,
1252+ None ,
1253+ None ,
1254+ ) ;
1255+ }
1256+ if let Some ( matched) = & io. parquet_matched_page_index {
1257+ rows. add (
1258+ & format ! ( "{}.page_index_matched" , namespace) ,
1259+ matched. as_usize ( ) as u64 ,
1260+ "count" ,
1261+ Some ( operator_name) ,
1262+ None ,
1263+ Some ( "io" ) ,
1264+ None ,
1265+ None ,
1266+ ) ;
1267+ }
1268+ } // End of Parquet-specific metrics block
1269+ } // End of IO metrics block
12061270
12071271 // Add compute metrics if present with namespacing
12081272 // TODO: Populate operator_parent and operator_index from execution plan hierarchy
@@ -1500,15 +1564,34 @@ impl ExecutionIOStats {
15001564 }
15011565 } ;
15021566
1503- // Helper to get metric value, trying both namespaced and legacy names
1567+ // Determine format type from metric keys
1568+ let format_type = if metrics. keys ( ) . any ( |k| k. starts_with ( "io.csv." ) ) {
1569+ Some ( IOFormatType :: Csv )
1570+ } else if metrics. keys ( ) . any ( |k| k. starts_with ( "io.parquet." ) ) {
1571+ Some ( IOFormatType :: Parquet )
1572+ } else if metrics. keys ( ) . any ( |k| k. starts_with ( "io.arrow." ) ) {
1573+ Some ( IOFormatType :: Arrow )
1574+ } else if metrics. keys ( ) . any ( |k| k. starts_with ( "io.json." ) ) {
1575+ Some ( IOFormatType :: Json )
1576+ } else {
1577+ None
1578+ } ;
1579+
1580+ // Helper to get metric value, trying all format-specific namespaces and legacy names
15041581 let get_metric = |namespaced : & str , legacy : & str | -> Option < u64 > {
1582+ // Try format-specific namespace
15051583 metrics
1506- . get ( namespaced)
1584+ . get ( & format ! ( "io.csv.{}" , namespaced. strip_prefix( "io.parquet." ) . unwrap_or( namespaced) ) )
1585+ . or_else ( || metrics. get ( & format ! ( "io.parquet.{}" , namespaced. strip_prefix( "io.parquet." ) . unwrap_or( namespaced) ) ) )
1586+ . or_else ( || metrics. get ( & format ! ( "io.arrow.{}" , namespaced. strip_prefix( "io.parquet." ) . unwrap_or( namespaced) ) ) )
1587+ . or_else ( || metrics. get ( & format ! ( "io.json.{}" , namespaced. strip_prefix( "io.parquet." ) . unwrap_or( namespaced) ) ) )
1588+ . or_else ( || metrics. get ( namespaced) )
15071589 . or_else ( || metrics. get ( legacy) )
15081590 . copied ( )
15091591 } ;
15101592
15111593 Ok ( Self {
1594+ format_type,
15121595 bytes_scanned : get_metric ( "io.parquet.bytes_scanned" , "bytes_scanned" )
15131596 . map ( |v| create_count ( v) ) ,
15141597 time_opening : get_metric ( "io.parquet.time_opening" , "time_opening" )
0 commit comments