Skip to content

Commit f4c4e33

Browse files
Fmt
1 parent aa24505 commit f4c4e33

5 files changed

Lines changed: 44 additions & 33 deletions

File tree

crates/datafusion-app/src/flightsql.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,8 @@ impl FlightSQLContext {
551551
}
552552

553553
// 5. Validate we got the SQL query in metadata
554-
let query_str = sql_query
555-
.ok_or_else(|| eyre::eyre!("SQL query not found in response metadata"))?;
554+
let query_str =
555+
sql_query.ok_or_else(|| eyre::eyre!("SQL query not found in response metadata"))?;
556556

557557
// 6. Decode metrics batch
558558
// batches_to_flight_data creates [schema, data] for the batch

crates/datafusion-app/src/stats.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,13 +1023,20 @@ impl ExecutionStats {
10231023
// Add compute metrics if present
10241024
if let Some(compute) = &self.compute {
10251025
if let Some(elapsed) = compute.elapsed_compute {
1026-
rows.add("elapsed_compute", elapsed as u64, "duration_ns", None, None, None);
1026+
rows.add(
1027+
"elapsed_compute",
1028+
elapsed as u64,
1029+
"duration_ns",
1030+
None,
1031+
None,
1032+
None,
1033+
);
10271034
}
10281035

10291036
// Helper to add compute metrics for a category
10301037
let add_compute_category = |rows: &mut MetricsTableBuilder,
1031-
compute_stats: &Option<Vec<PartitionsComputeStats>>,
1032-
category: &str| {
1038+
compute_stats: &Option<Vec<PartitionsComputeStats>>,
1039+
category: &str| {
10331040
if let Some(stats) = compute_stats {
10341041
for stat in stats {
10351042
for (partition_id, elapsed) in stat.elapsed_computes.iter().enumerate() {
@@ -1058,10 +1065,7 @@ impl ExecutionStats {
10581065
}
10591066

10601067
/// Deserialize ExecutionStats from metrics table
1061-
pub fn from_metrics_table(
1062-
batch: RecordBatch,
1063-
query: String,
1064-
) -> color_eyre::Result<Self> {
1068+
pub fn from_metrics_table(batch: RecordBatch, query: String) -> color_eyre::Result<Self> {
10651069
let mut stats_builder = ExecutionStatsBuilder::new(query);
10661070

10671071
// Extract column arrays
@@ -1191,11 +1195,7 @@ impl ExecutionStatsBuilder {
11911195
self.compute_metrics
11921196
.entry(cat.to_string())
11931197
.or_default()
1194-
.push((
1195-
operator.unwrap_or("Unknown").to_string(),
1196-
partition,
1197-
value,
1198-
));
1198+
.push((operator.unwrap_or("Unknown").to_string(), partition, value));
11991199
}
12001200
_ => {
12011201
debug!("Unknown metric: {} (category: {:?})", name, category);
@@ -1275,12 +1275,20 @@ impl ExecutionIOStats {
12751275
time_opening: metrics.get("time_opening").map(|v| create_time(*v)),
12761276
time_scanning: metrics.get("time_scanning").map(|v| create_time(*v)),
12771277
parquet_output_rows: metrics.get("output_rows").map(|v| *v as usize),
1278-
parquet_pruned_page_index: metrics.get("parquet_page_index_pruned").map(|v| create_count(*v)),
1279-
parquet_matched_page_index: metrics.get("parquet_page_index_matched").map(|v| create_count(*v)),
1278+
parquet_pruned_page_index: metrics
1279+
.get("parquet_page_index_pruned")
1280+
.map(|v| create_count(*v)),
1281+
parquet_matched_page_index: metrics
1282+
.get("parquet_page_index_matched")
1283+
.map(|v| create_count(*v)),
12801284
parquet_rg_pruned_stats: metrics.get("parquet_rg_pruned").map(|v| create_count(*v)),
12811285
parquet_rg_matched_stats: metrics.get("parquet_rg_matched").map(|v| create_count(*v)),
1282-
parquet_rg_pruned_bloom_filter: metrics.get("parquet_bloom_pruned").map(|v| create_count(*v)),
1283-
parquet_rg_matched_bloom_filter: metrics.get("parquet_bloom_matched").map(|v| create_count(*v)),
1286+
parquet_rg_pruned_bloom_filter: metrics
1287+
.get("parquet_bloom_pruned")
1288+
.map(|v| create_count(*v)),
1289+
parquet_rg_matched_bloom_filter: metrics
1290+
.get("parquet_bloom_matched")
1291+
.map(|v| create_count(*v)),
12841292
})
12851293
}
12861294
}

src/cli/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@ impl CliApp {
242242
(_, _, false, true, true) => Err(eyre!(
243243
"The `benchmark` and `analyze` flags are mutually exclusive"
244244
)),
245-
(false, false, _, _, true) => Err(eyre!(
246-
"Analyze requires exactly one command or file"
247-
)),
245+
(false, false, _, _, true) => {
246+
Err(eyre!("Analyze requires exactly one command or file"))
247+
}
248248

249249
// Execution cases
250250
(false, true, _, false, false) => self.execute_files(&self.args.files).await,
@@ -287,9 +287,9 @@ impl CliApp {
287287
(_, _, _, true, true) => Err(eyre!(
288288
"The `benchmark` and `analyze` flags are mutually exclusive"
289289
)),
290-
(false, false, _, _, true) => Err(eyre!(
291-
"Analyze requires exactly one command or file"
292-
)),
290+
(false, false, _, _, true) => {
291+
Err(eyre!("Analyze requires exactly one command or file"))
292+
}
293293

294294
// Execution cases
295295
(true, false, false, false, false) => self.execute_commands(&self.args.commands).await,

src/server/flightsql/service.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -866,13 +866,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
866866
stats.collect_stats(); // Collect IO and compute metrics from plan
867867

868868
// 3. Convert ExecutionStats to metrics table format
869-
let metrics_batch = stats
870-
.to_metrics_table()
871-
.map_err(|e| Status::internal(format!("Metrics serialization failed: {}", e)))?;
869+
let metrics_batch = stats.to_metrics_table().map_err(|e| {
870+
Status::internal(format!("Metrics serialization failed: {}", e))
871+
})?;
872872

873873
// 4. Encode metrics batch as FlightData
874-
let mut flight_data = batches_to_flight_data(&metrics_batch.schema(), vec![metrics_batch])
875-
.map_err(|e| Status::internal(format!("Failed to encode metrics batch: {}", e)))?;
874+
let mut flight_data =
875+
batches_to_flight_data(&metrics_batch.schema(), vec![metrics_batch]).map_err(
876+
|e| Status::internal(format!("Failed to encode metrics batch: {}", e)),
877+
)?;
876878

877879
// 5. Add SQL query to schema message metadata
878880
// The first FlightData message contains the schema
@@ -886,9 +888,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
886888
.map(|fd| {
887889
// Serialize FlightData to bytes
888890
let bytes = fd.encode_to_vec();
889-
arrow_flight::Result {
890-
body: bytes.into(),
891-
}
891+
arrow_flight::Result { body: bytes.into() }
892892
})
893893
.collect();
894894

tests/extension_cases/flightsql.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1618,7 +1618,10 @@ pub async fn test_analyze_with_timing_metrics() {
16181618
output.contains("Physical Planning"),
16191619
"Should contain Physical Planning time"
16201620
);
1621-
assert!(output.contains("Execution"), "Should contain Execution time");
1621+
assert!(
1622+
output.contains("Execution"),
1623+
"Should contain Execution time"
1624+
);
16221625
assert!(output.contains("Total"), "Should contain Total time");
16231626

16241627
fixture.shutdown_and_wait().await;

0 commit comments

Comments
 (0)