Skip to content

Commit a2964a8

Browse files
Matthew AgostinelliMatthew Agostinelli
authored andcommitted
Fix FlightSQL client to display all batches from multi-partition queries
Previously, the FlightSQL client would only display partial results when queries returned multiple batches (common with GROUP BY without ORDER BY due to partitioned execution). This happened because: 1. `run_flightsqls()` only fetched one batch using `match streams.next().await` instead of iterating through all batches 2. `take_record_batches()` only used the first batch when multiple batches existed, ignoring the rest This fix: - Changes `run_flightsqls()` to use `while let` loop to collect all batches from the FlightSQL stream - Updates `take_record_batches()` to concatenate all batches before applying pagination indices, ensuring all rows are visible Known limitations for future consideration: - All batches are collected eagerly into memory before display, which could be problematic for very large result sets - `concat_batches()` creates a temporary copy when concatenating, briefly doubling memory usage during pagination - Unlike the SQL tab which uses lazy batch loading, FlightSQL now loads all data upfront (this matches expected behavior for aggregation queries but may warrant lazy loading for large streaming results in the future) Fixes #346
1 parent 2251b56 commit a2964a8

2 files changed

Lines changed: 42 additions & 33 deletions

File tree

src/tui/execution.rs

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -279,37 +279,42 @@ impl TuiExecution {
279279
if let Some(streams) =
280280
self.flightsql_result_stream.lock().await.as_mut()
281281
{
282-
match streams.next().await {
283-
Some((ticket, Ok(batch))) => {
284-
info!("Received batch for {ticket}");
285-
let duration = start.elapsed();
286-
let results = ExecutionResultsBatch {
287-
batch,
288-
duration,
289-
query: sql.to_string(),
290-
};
291-
sender.send(
292-
AppEvent::FlightSQLExecutionResultsNextBatch(
293-
results,
294-
),
295-
)?;
282+
// Collect all batches from the stream
283+
while let Some((ticket, result)) =
284+
streams.next().await
285+
{
286+
match result {
287+
Ok(batch) => {
288+
info!("Received batch for {ticket}");
289+
let duration = start.elapsed();
290+
let results = ExecutionResultsBatch {
291+
batch,
292+
duration,
293+
query: sql.to_string(),
294+
};
295+
sender.send(
296+
AppEvent::FlightSQLExecutionResultsNextBatch(
297+
results,
298+
),
299+
)?;
300+
}
301+
Err(e) => {
302+
error!(
303+
"Error executing stream for ticket {ticket}: {:?}",
304+
e
305+
);
306+
let elapsed = start.elapsed();
307+
let e = ExecutionError {
308+
query: sql.to_string(),
309+
error: e.to_string(),
310+
duration: elapsed,
311+
};
312+
sender.send(
313+
AppEvent::FlightSQLExecutionResultsError(e),
314+
)?;
315+
break;
316+
}
296317
}
297-
Some((ticket, Err(e))) => {
298-
error!(
299-
"Error executing stream for ticket {ticket}: {:?}",
300-
e
301-
);
302-
let elapsed = start.elapsed();
303-
let e = ExecutionError {
304-
query: sql.to_string(),
305-
error: e.to_string(),
306-
duration: elapsed,
307-
};
308-
sender.send(
309-
AppEvent::FlightSQLExecutionResultsError(e),
310-
)?;
311-
}
312-
None => {}
313318
}
314319
}
315320
}

src/tui/state/tabs/flightsql.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use color_eyre::Result;
2222
use datafusion::arrow::{
2323
array::{Array, RecordBatch, UInt32Array},
24-
compute::take_record_batch,
24+
compute::{concat_batches, take_record_batch},
2525
datatypes::Schema,
2626
error::ArrowError,
2727
};
@@ -316,7 +316,11 @@ fn take_record_batches(
316316
match batches.len() {
317317
0 => Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))),
318318
1 => take_record_batch(&batches[0], indices),
319-
// For now we just get the first batch
320-
_ => take_record_batch(&batches[0], indices),
319+
_ => {
320+
// Concatenate all batches into a single batch before taking indices
321+
let schema = batches[0].schema();
322+
let concatenated = concat_batches(&schema, batches)?;
323+
take_record_batch(&concatenated, indices)
324+
}
321325
}
322326
}

0 commit comments

Comments
 (0)