Skip to content

Commit 185d136

Browse files
Analyze flightsql
1 parent ac4ec8f commit 185d136

11 files changed

Lines changed: 1372 additions & 8 deletions

File tree

.claude/settings.local.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
"Bash(cargo fmt:*)",
3131
"Bash(./target/release/dft:*)",
3232
"Bash(/Users/matth/OpenSource/datafusion-tui/target/debug/dft:*)",
33-
"Bash(./target/debug/dft:*)"
33+
"Bash(./target/debug/dft:*)",
34+
"Bash(xargs rg:*)",
35+
"Bash(rg:*)"
3436
],
3537
"deny": [],
3638
"ask": []

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datafusion-app/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ opendal = { features = [
3535
"services-huggingface",
3636
], optional = true, version = "0.54.1" }
3737
parking_lot = "0.12.3"
38+
prost = { optional = true, version = "0.14" }
3839
serde = { features = ["derive"], version = "1.0.197" }
3940
tokio = { features = ["macros", "rt-multi-thread"], version = "1.36.0" }
4041
tokio-metrics = { features = [
@@ -51,7 +52,7 @@ criterion = { features = ["async_tokio"], version = "0.5.1" }
5152
[features]
5253
default = ["functions-parquet"]
5354
deltalake = ["dep:deltalake"]
54-
flightsql = ["dep:arrow-flight", "dep:base64", "dep:tonic"]
55+
flightsql = ["dep:arrow-flight", "dep:base64", "dep:prost", "dep:tonic"]
5556
functions-json = ["dep:datafusion-functions-json"]
5657
functions-parquet = ["dep:datafusion-functions-parquet"]
5758
huggingface = ["object_store_opendal", "opendal", "url"]

crates/datafusion-app/src/flightsql.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,4 +472,96 @@ impl FlightSQLContext {
472472
))
473473
}
474474
}
475+
476+
/// Get raw metrics batches without reconstruction (for --analyze-raw)
477+
pub async fn analyze_query_raw(
478+
&self,
479+
query: &str,
480+
) -> Result<(datafusion::arrow::array::RecordBatch, datafusion::arrow::array::RecordBatch)> {
481+
self.fetch_analyze_batches(query).await
482+
}
483+
484+
/// Reconstruct ExecutionStats from metrics (for --analyze)
485+
pub async fn analyze_query(&self, query: &str) -> Result<crate::stats::ExecutionStats> {
486+
use datafusion::arrow::array::StringArray;
487+
488+
let (queries_batch, metrics_batch) = self.fetch_analyze_batches(query).await?;
489+
490+
// Extract query string from queries batch
491+
let query_array = queries_batch
492+
.column(0)
493+
.as_any()
494+
.downcast_ref::<StringArray>()
495+
.ok_or_else(|| eyre::eyre!("Invalid queries batch schema"))?;
496+
let query_str = query_array.value(0).to_string();
497+
498+
// Reconstruct ExecutionStats from metrics table
499+
let stats = crate::stats::ExecutionStats::from_metrics_table(metrics_batch, query_str)?;
500+
501+
Ok(stats)
502+
}
503+
504+
/// Shared logic to fetch analyze batches from server
505+
async fn fetch_analyze_batches(
506+
&self,
507+
query: &str,
508+
) -> Result<(datafusion::arrow::array::RecordBatch, datafusion::arrow::array::RecordBatch)> {
509+
use arrow_flight::utils::flight_data_to_batches;
510+
use arrow_flight::{Action, FlightData};
511+
512+
// 1. Create Action with type "analyze_query" and SQL in body
513+
let action = Action {
514+
r#type: "analyze_query".to_string(),
515+
body: query.as_bytes().to_vec().into(),
516+
};
517+
518+
// 2. Call do_action on the FlightSQL service
519+
let mut client = self.client.lock().await;
520+
let client = client
521+
.as_mut()
522+
.ok_or_else(|| eyre::eyre!("No FlightSQL client configured"))?;
523+
524+
let mut stream = client
525+
.do_action(action.into_request())
526+
.await
527+
.map_err(|e| eyre::eyre!("do_action failed: {}", e))?;
528+
529+
// 3. Collect all Result messages from stream
530+
let mut result_messages = Vec::new();
531+
while let Some(result) = stream.next().await {
532+
let result = result.map_err(|e| eyre::eyre!("Stream error: {}", e))?;
533+
result_messages.push(result);
534+
}
535+
536+
// 4. Decode each Result message to FlightData
537+
let mut all_flight_data = Vec::new();
538+
539+
for result in result_messages {
540+
// Deserialize the FlightData from the Result.body bytes using prost
541+
// Note: FlightData implements prost::Message
542+
let flight_data = <FlightData as prost::Message>::decode(result.body.as_ref())
543+
.map_err(|e| eyre::eyre!("Failed to decode FlightData: {}", e))?;
544+
545+
all_flight_data.push(flight_data);
546+
}
547+
548+
// 5. Convert all FlightData to RecordBatches using flight_data_to_batches
549+
let batches = flight_data_to_batches(&all_flight_data)
550+
.map_err(|e| eyre::eyre!("Failed to decode batches: {}", e))?;
551+
552+
// 6. Split batches - first set is queries (1 batch with schema), second set is metrics
553+
// The batches_to_flight_data function generates: [schema, data batch] for each table
554+
// So we expect: [queries_schema, queries_batch, metrics_schema, metrics_batch]
555+
// But flight_data_to_batches returns just the data batches, not schemas
556+
// So we expect: [queries_batch, metrics_batch]
557+
558+
if batches.len() < 2 {
559+
return Err(eyre::eyre!(
560+
"Invalid analyze response: expected at least 2 batches, got {}",
561+
batches.len()
562+
));
563+
}
564+
565+
Ok((batches[0].clone(), batches[1].clone()))
566+
}
475567
}

0 commit comments

Comments
 (0)