Skip to content

Commit a8eedd3

Browse files
FlightSQL pagination improvement & pagination logic unification (#364)
1 parent 76b7369 commit a8eedd3

13 files changed

Lines changed: 945 additions & 218 deletions

File tree

src/tui/execution.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,35 @@ impl TuiExecution {
403403
}
404404
}
405405

406+
#[cfg(feature = "flightsql")]
407+
pub async fn next_flightsql_batch(&self, sql: String, sender: UnboundedSender<AppEvent>) {
408+
let mut streams = self.flightsql_result_stream.lock().await;
409+
if let Some(s) = streams.as_mut() {
410+
let start = std::time::Instant::now();
411+
if let Some((ticket, batch_result)) = s.next().await {
412+
match batch_result {
413+
Ok(batch) => {
414+
info!(
415+
"Fetched next FlightSQL batch from {}: {} rows",
416+
ticket,
417+
batch.num_rows()
418+
);
419+
let duration = start.elapsed();
420+
let results = ExecutionResultsBatch {
421+
query: sql,
422+
batch,
423+
duration,
424+
};
425+
let _ = sender.send(AppEvent::FlightSQLExecutionResultsNextBatch(results));
426+
}
427+
Err(e) => {
428+
error!("Error getting next FlightSQL batch: {:?}", e);
429+
}
430+
}
431+
}
432+
}
433+
}
434+
406435
// TODO: Maybe just expose `inner` and use that rather than re-implementing the same
407436
// functions here.
408437
#[cfg(feature = "flightsql")]

src/tui/handlers/flightsql.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,30 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
7878
}
7979
(KeyCode::Right, KeyModifiers::NONE) => {
8080
let _event_tx = app.event_tx();
81+
82+
if let Some(current_page) = app.state.flightsql_tab.current_page() {
83+
let next_page = current_page + 1;
84+
85+
// Check if we need more batches for the next page
86+
if app
87+
.state
88+
.flightsql_tab
89+
.needs_more_batches_for_page(next_page)
90+
{
91+
info!("Fetching more batches for page {}", next_page);
92+
93+
if let Some(last_query) = app.state.history_tab.history().last() {
94+
let execution = Arc::clone(&app.execution);
95+
let sql = last_query.sql().clone();
96+
tokio::spawn(async move {
97+
execution.next_flightsql_batch(sql, _event_tx).await;
98+
});
99+
}
100+
return; // Wait for batch to load before advancing page
101+
}
102+
}
103+
104+
// Sufficient data available, advance page
81105
if let Err(e) = _event_tx.send(AppEvent::FlightSQLExecutionResultsNextPage) {
82106
error!("Error going to next FlightSQL results page: {e}");
83107
}

src/tui/handlers/mod.rs

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use tui_logger::TuiWidgetEvent;
2828
use crate::tui::state::tabs::flightsql::FlightSQLConnectionStatus;
2929
use crate::tui::state::tabs::history::Context;
3030
use crate::tui::ExecutionResultsBatch;
31-
32-
#[cfg(feature = "flightsql")]
3331
use std::sync::Arc;
3432

3533
use super::App;
@@ -222,13 +220,50 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
222220
duration,
223221
batch,
224222
} = r;
223+
224+
let is_first_batch = app.state.sql_tab.current_page().is_none();
225+
225226
app.state.sql_tab.add_batch(batch);
227+
228+
if is_first_batch {
229+
app.state.sql_tab.next_page();
230+
app.state.sql_tab.refresh_query_results_state();
231+
232+
let history_query =
233+
HistoryQuery::new(Context::Local, query.to_string(), duration, None, None);
234+
app.state.history_tab.add_to_history(history_query);
235+
app.state.history_tab.refresh_history_table_state();
236+
} else {
237+
app.state.sql_tab.refresh_query_results_state();
238+
239+
// Check if we have enough data for the next page now
240+
// If not, automatically fetch another batch
241+
if let Some(current_page) = app.state.sql_tab.current_page() {
242+
let next_page = current_page + 1;
243+
if app.state.sql_tab.needs_more_batches_for_page(next_page) {
244+
info!(
245+
"Still need more batches for page {}, fetching next batch",
246+
next_page
247+
);
248+
let execution = Arc::clone(&app.execution);
249+
let sql = query.clone();
250+
let _event_tx = app.event_tx();
251+
tokio::spawn(async move {
252+
execution.next_batch(sql, _event_tx).await;
253+
});
254+
} else {
255+
// We now have enough data, advance to the page
256+
info!("Sufficient data loaded, advancing to page {}", next_page);
257+
if let Err(e) = app.event_tx().send(AppEvent::ExecutionResultsNextPage) {
258+
error!("Error advancing to next page: {e}");
259+
}
260+
}
261+
}
262+
}
263+
}
264+
AppEvent::ExecutionResultsNextPage => {
226265
app.state.sql_tab.next_page();
227266
app.state.sql_tab.refresh_query_results_state();
228-
let history_query =
229-
HistoryQuery::new(Context::Local, query.to_string(), duration, None, None);
230-
app.state.history_tab.add_to_history(history_query);
231-
app.state.history_tab.refresh_history_table_state();
232267
}
233268
#[cfg(feature = "flightsql")]
234269
AppEvent::FlightSQLExecutionResultsNextBatch(r) => {
@@ -237,15 +272,54 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
237272
duration,
238273
batch,
239274
} = r;
240-
info!("Adding batch to flightsql tab");
275+
276+
let is_first_batch = app.state.flightsql_tab.current_page().is_none();
277+
241278
app.state.flightsql_tab.set_in_progress(false);
242279
app.state.flightsql_tab.add_batch(batch);
243-
app.state.flightsql_tab.next_page();
244-
app.state.flightsql_tab.refresh_query_results_state();
245-
let history_query =
246-
HistoryQuery::new(Context::FlightSQL, query.to_string(), duration, None, None);
247-
app.state.history_tab.add_to_history(history_query);
248-
app.state.history_tab.refresh_history_table_state();
280+
281+
if is_first_batch {
282+
app.state.flightsql_tab.next_page();
283+
app.state.flightsql_tab.refresh_query_results_state();
284+
285+
let history_query =
286+
HistoryQuery::new(Context::FlightSQL, query.to_string(), duration, None, None);
287+
app.state.history_tab.add_to_history(history_query);
288+
app.state.history_tab.refresh_history_table_state();
289+
} else {
290+
app.state.flightsql_tab.refresh_query_results_state();
291+
292+
// Check if we have enough data for the next page now
293+
// If not, automatically fetch another batch
294+
if let Some(current_page) = app.state.flightsql_tab.current_page() {
295+
let next_page = current_page + 1;
296+
if app
297+
.state
298+
.flightsql_tab
299+
.needs_more_batches_for_page(next_page)
300+
{
301+
info!(
302+
"Still need more batches for page {}, fetching next batch",
303+
next_page
304+
);
305+
let execution = Arc::clone(&app.execution);
306+
let sql = query.clone();
307+
let _event_tx = app.event_tx();
308+
tokio::spawn(async move {
309+
execution.next_flightsql_batch(sql, _event_tx).await;
310+
});
311+
} else {
312+
// We now have enough data, advance to the page
313+
info!("Sufficient data loaded, advancing to page {}", next_page);
314+
if let Err(e) = app
315+
.event_tx()
316+
.send(AppEvent::FlightSQLExecutionResultsNextPage)
317+
{
318+
error!("Error advancing to next page: {e}");
319+
}
320+
}
321+
}
322+
}
249323
}
250324
#[cfg(feature = "flightsql")]
251325
AppEvent::FlightSQLEstablishConnection => {

src/tui/handlers/sql.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,29 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
9292
}
9393
},
9494
(KeyCode::Right, KeyModifiers::NONE) => {
95-
let _event_tx = app.event_tx().clone();
96-
if let (Some(p), c) = (
97-
app.state().sql_tab.current_page(),
98-
app.state().sql_tab.batches_count(),
99-
) {
100-
// We don't need to fetch the next batch if moving forward a page and we're not
101-
// on the last page since we would have already fetched it.
102-
if p < c - 1 {
103-
app.state.sql_tab.next_page();
104-
app.state.sql_tab.refresh_query_results_state();
105-
return;
95+
let _event_tx = app.event_tx();
96+
97+
if let Some(current_page) = app.state.sql_tab.current_page() {
98+
let next_page = current_page + 1;
99+
100+
// Check if we need more batches for the next page
101+
if app.state.sql_tab.needs_more_batches_for_page(next_page) {
102+
info!("Fetching more batches for page {}", next_page);
103+
104+
if let Some(last_query) = app.state.history_tab.history().last() {
105+
let execution = Arc::clone(&app.execution);
106+
let sql = last_query.sql().clone();
107+
tokio::spawn(async move {
108+
execution.next_batch(sql, _event_tx).await;
109+
});
110+
}
111+
return; // Wait for batch to load before advancing page
106112
}
107113
}
108-
if let Some(p) = app.state.history_tab.history().last() {
109-
let execution = Arc::clone(&app.execution);
110-
let sql = p.sql().clone();
111-
tokio::spawn(async move {
112-
execution.next_batch(sql, _event_tx).await;
113-
});
114+
115+
// Sufficient data available, advance page
116+
if let Err(e) = app.event_tx().send(AppEvent::ExecutionResultsNextPage) {
117+
error!("Error going to next SQL results page: {e}");
114118
}
115119
}
116120
(KeyCode::Left, KeyModifiers::NONE) => {

src/tui/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
pub mod execution;
1919
pub mod handlers;
20+
mod pagination;
2021
pub mod state;
2122
pub mod ui;
2223

24+
pub use pagination::{extract_page, has_sufficient_rows, page_row_range, PAGE_SIZE};
25+
2326
use color_eyre::eyre::eyre;
2427
use color_eyre::Result;
2528
use datafusion_app::config::merge_configs;
@@ -69,6 +72,7 @@ pub enum AppEvent {
6972
// Query Execution
7073
NewExecution,
7174
ExecutionResultsNextBatch(ExecutionResultsBatch),
75+
ExecutionResultsNextPage,
7276
ExecutionResultsPreviousPage,
7377
ExecutionResultsError(ExecutionError),
7478
// FlightSQL

src/tui/pagination.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use datafusion::arrow::array::{Array, RecordBatch, UInt32Array};
2+
use datafusion::arrow::compute::{concat_batches, take_record_batch};
3+
use datafusion::arrow::error::ArrowError;
4+
use std::sync::Arc;
5+
6+
pub const PAGE_SIZE: usize = 100;
7+
8+
/// Calculate the row range needed for a given page
9+
pub fn page_row_range(page: usize, page_size: usize) -> (usize, usize) {
10+
let start = page * page_size;
11+
let end = start + page_size;
12+
(start, end)
13+
}
14+
15+
/// Check if we have enough rows loaded to display the requested page
16+
pub fn has_sufficient_rows(loaded_rows: usize, page: usize, page_size: usize) -> bool {
17+
let (_start, end) = page_row_range(page, page_size);
18+
loaded_rows >= end
19+
}
20+
21+
/// Extract a page of rows from loaded batches
22+
/// This handles pagination across batch boundaries by concatenating only what's needed
23+
pub fn extract_page(
24+
batches: &[RecordBatch],
25+
page: usize,
26+
page_size: usize,
27+
) -> Result<RecordBatch, ArrowError> {
28+
if batches.is_empty() {
29+
return Ok(RecordBatch::new_empty(Arc::new(
30+
datafusion::arrow::datatypes::Schema::empty(),
31+
)));
32+
}
33+
34+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
35+
let (start, end) = page_row_range(page, page_size);
36+
37+
// Clamp end to available rows
38+
let end = end.min(total_rows);
39+
40+
if start >= total_rows {
41+
// Page is beyond available data
42+
return Ok(RecordBatch::new_empty(batches[0].schema()));
43+
}
44+
45+
// Create indices for the rows we want
46+
let indices = UInt32Array::from_iter_values((start as u32)..(end as u32));
47+
48+
// Extract rows from batches
49+
extract_rows_from_batches(batches, &indices)
50+
}
51+
52+
/// Extract specific rows (by global indices) from batches
53+
/// Handles batch boundaries by concatenating only necessary batches
54+
fn extract_rows_from_batches(
55+
batches: &[RecordBatch],
56+
indices: &dyn Array,
57+
) -> Result<RecordBatch, ArrowError> {
58+
match batches.len() {
59+
0 => Ok(RecordBatch::new_empty(Arc::new(
60+
datafusion::arrow::datatypes::Schema::empty(),
61+
))),
62+
1 => take_record_batch(&batches[0], indices),
63+
_ => {
64+
// Multiple batches: concat then extract rows
65+
// Only concat the batches we've loaded (lazy loading ensures minimal concat)
66+
let schema = batches[0].schema();
67+
let concatenated = concat_batches(&schema, batches)?;
68+
take_record_batch(&concatenated, indices)
69+
}
70+
}
71+
}
72+
73+
#[cfg(test)]
74+
mod tests {
75+
use super::*;
76+
77+
#[test]
78+
fn test_page_row_range() {
79+
assert_eq!(page_row_range(0, 100), (0, 100));
80+
assert_eq!(page_row_range(1, 100), (100, 200));
81+
assert_eq!(page_row_range(2, 50), (100, 150));
82+
}
83+
84+
#[test]
85+
fn test_has_sufficient_rows() {
86+
assert!(has_sufficient_rows(100, 0, 100)); // Exactly enough
87+
assert!(has_sufficient_rows(150, 0, 100)); // More than enough
88+
assert!(!has_sufficient_rows(50, 0, 100)); // Not enough
89+
assert!(!has_sufficient_rows(150, 1, 100)); // Need 200, only have 150
90+
}
91+
}

0 commit comments

Comments
 (0)