Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/tui/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,35 @@ impl TuiExecution {
}
}

#[cfg(feature = "flightsql")]
pub async fn next_flightsql_batch(&self, sql: String, sender: UnboundedSender<AppEvent>) {
let mut streams = self.flightsql_result_stream.lock().await;
if let Some(s) = streams.as_mut() {
let start = std::time::Instant::now();
if let Some((ticket, batch_result)) = s.next().await {
match batch_result {
Ok(batch) => {
info!(
"Fetched next FlightSQL batch from {}: {} rows",
ticket,
batch.num_rows()
);
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql,
batch,
duration,
};
let _ = sender.send(AppEvent::FlightSQLExecutionResultsNextBatch(results));
}
Err(e) => {
error!("Error getting next FlightSQL batch: {:?}", e);
}
}
}
}
}

// TODO: Maybe just expose `inner` and use that rather than re-implementing the same
// functions here.
#[cfg(feature = "flightsql")]
Expand Down
24 changes: 24 additions & 0 deletions src/tui/handlers/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,30 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
}
(KeyCode::Right, KeyModifiers::NONE) => {
let _event_tx = app.event_tx();

if let Some(current_page) = app.state.flightsql_tab.current_page() {
let next_page = current_page + 1;

// Check if we need more batches for the next page
if app
.state
.flightsql_tab
.needs_more_batches_for_page(next_page)
{
info!("Fetching more batches for page {}", next_page);

if let Some(last_query) = app.state.history_tab.history().last() {
let execution = Arc::clone(&app.execution);
let sql = last_query.sql().clone();
tokio::spawn(async move {
execution.next_flightsql_batch(sql, _event_tx).await;
});
}
return; // Wait for batch to load before advancing page
}
}

// Sufficient data available, advance page
if let Err(e) = _event_tx.send(AppEvent::FlightSQLExecutionResultsNextPage) {
error!("Error going to next FlightSQL results page: {e}");
}
Expand Down
100 changes: 87 additions & 13 deletions src/tui/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use tui_logger::TuiWidgetEvent;
use crate::tui::state::tabs::flightsql::FlightSQLConnectionStatus;
use crate::tui::state::tabs::history::Context;
use crate::tui::ExecutionResultsBatch;

#[cfg(feature = "flightsql")]
use std::sync::Arc;

use super::App;
Expand Down Expand Up @@ -222,13 +220,50 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
duration,
batch,
} = r;

let is_first_batch = app.state.sql_tab.current_page().is_none();

app.state.sql_tab.add_batch(batch);

if is_first_batch {
app.state.sql_tab.next_page();
app.state.sql_tab.refresh_query_results_state();

let history_query =
HistoryQuery::new(Context::Local, query.to_string(), duration, None, None);
app.state.history_tab.add_to_history(history_query);
app.state.history_tab.refresh_history_table_state();
} else {
app.state.sql_tab.refresh_query_results_state();

// Check if we have enough data for the next page now
// If not, automatically fetch another batch
if let Some(current_page) = app.state.sql_tab.current_page() {
let next_page = current_page + 1;
if app.state.sql_tab.needs_more_batches_for_page(next_page) {
info!(
"Still need more batches for page {}, fetching next batch",
next_page
);
let execution = Arc::clone(&app.execution);
let sql = query.clone();
let _event_tx = app.event_tx();
tokio::spawn(async move {
execution.next_batch(sql, _event_tx).await;
});
} else {
// We now have enough data, advance to the page
info!("Sufficient data loaded, advancing to page {}", next_page);
if let Err(e) = app.event_tx().send(AppEvent::ExecutionResultsNextPage) {
error!("Error advancing to next page: {e}");
}
}
}
}
}
AppEvent::ExecutionResultsNextPage => {
app.state.sql_tab.next_page();
app.state.sql_tab.refresh_query_results_state();
let history_query =
HistoryQuery::new(Context::Local, query.to_string(), duration, None, None);
app.state.history_tab.add_to_history(history_query);
app.state.history_tab.refresh_history_table_state();
}
#[cfg(feature = "flightsql")]
AppEvent::FlightSQLExecutionResultsNextBatch(r) => {
Expand All @@ -237,15 +272,54 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
duration,
batch,
} = r;
info!("Adding batch to flightsql tab");

let is_first_batch = app.state.flightsql_tab.current_page().is_none();

app.state.flightsql_tab.set_in_progress(false);
app.state.flightsql_tab.add_batch(batch);
app.state.flightsql_tab.next_page();
app.state.flightsql_tab.refresh_query_results_state();
let history_query =
HistoryQuery::new(Context::FlightSQL, query.to_string(), duration, None, None);
app.state.history_tab.add_to_history(history_query);
app.state.history_tab.refresh_history_table_state();

if is_first_batch {
app.state.flightsql_tab.next_page();
app.state.flightsql_tab.refresh_query_results_state();

let history_query =
HistoryQuery::new(Context::FlightSQL, query.to_string(), duration, None, None);
app.state.history_tab.add_to_history(history_query);
app.state.history_tab.refresh_history_table_state();
} else {
app.state.flightsql_tab.refresh_query_results_state();

// Check if we have enough data for the next page now
// If not, automatically fetch another batch
if let Some(current_page) = app.state.flightsql_tab.current_page() {
let next_page = current_page + 1;
if app
.state
.flightsql_tab
.needs_more_batches_for_page(next_page)
{
info!(
"Still need more batches for page {}, fetching next batch",
next_page
);
let execution = Arc::clone(&app.execution);
let sql = query.clone();
let _event_tx = app.event_tx();
tokio::spawn(async move {
execution.next_flightsql_batch(sql, _event_tx).await;
});
} else {
// We now have enough data, advance to the page
info!("Sufficient data loaded, advancing to page {}", next_page);
if let Err(e) = app
.event_tx()
.send(AppEvent::FlightSQLExecutionResultsNextPage)
{
error!("Error advancing to next page: {e}");
}
}
}
}
}
#[cfg(feature = "flightsql")]
AppEvent::FlightSQLEstablishConnection => {
Expand Down
38 changes: 21 additions & 17 deletions src/tui/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,29 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
}
},
(KeyCode::Right, KeyModifiers::NONE) => {
let _event_tx = app.event_tx().clone();
if let (Some(p), c) = (
app.state().sql_tab.current_page(),
app.state().sql_tab.batches_count(),
) {
// We don't need to fetch the next batch if moving forward a page and we're not
// on the last page since we would have already fetched it.
if p < c - 1 {
app.state.sql_tab.next_page();
app.state.sql_tab.refresh_query_results_state();
return;
let _event_tx = app.event_tx();

if let Some(current_page) = app.state.sql_tab.current_page() {
let next_page = current_page + 1;

// Check if we need more batches for the next page
if app.state.sql_tab.needs_more_batches_for_page(next_page) {
info!("Fetching more batches for page {}", next_page);

if let Some(last_query) = app.state.history_tab.history().last() {
let execution = Arc::clone(&app.execution);
let sql = last_query.sql().clone();
tokio::spawn(async move {
execution.next_batch(sql, _event_tx).await;
});
}
return; // Wait for batch to load before advancing page
}
}
if let Some(p) = app.state.history_tab.history().last() {
let execution = Arc::clone(&app.execution);
let sql = p.sql().clone();
tokio::spawn(async move {
execution.next_batch(sql, _event_tx).await;
});

// Sufficient data available, advance page
if let Err(e) = app.event_tx().send(AppEvent::ExecutionResultsNextPage) {
error!("Error going to next SQL results page: {e}");
}
}
(KeyCode::Left, KeyModifiers::NONE) => {
Expand Down
4 changes: 4 additions & 0 deletions src/tui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

pub mod execution;
pub mod handlers;
mod pagination;
pub mod state;
pub mod ui;

pub use pagination::{extract_page, has_sufficient_rows, page_row_range, PAGE_SIZE};

use color_eyre::eyre::eyre;
use color_eyre::Result;
use datafusion_app::config::merge_configs;
Expand Down Expand Up @@ -69,6 +72,7 @@ pub enum AppEvent {
// Query Execution
NewExecution,
ExecutionResultsNextBatch(ExecutionResultsBatch),
ExecutionResultsNextPage,
ExecutionResultsPreviousPage,
ExecutionResultsError(ExecutionError),
// FlightSQL
Expand Down
91 changes: 91 additions & 0 deletions src/tui/pagination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use datafusion::arrow::array::{Array, RecordBatch, UInt32Array};
use datafusion::arrow::compute::{concat_batches, take_record_batch};
use datafusion::arrow::error::ArrowError;
use std::sync::Arc;

pub const PAGE_SIZE: usize = 100;

/// Calculate the row range needed for a given page
pub fn page_row_range(page: usize, page_size: usize) -> (usize, usize) {
let start = page * page_size;
let end = start + page_size;
(start, end)
}

/// Check if we have enough rows loaded to display the requested page
pub fn has_sufficient_rows(loaded_rows: usize, page: usize, page_size: usize) -> bool {
let (_start, end) = page_row_range(page, page_size);
loaded_rows >= end
}

/// Extract a page of rows from loaded batches
/// This handles pagination across batch boundaries by concatenating only what's needed
pub fn extract_page(
batches: &[RecordBatch],
page: usize,
page_size: usize,
) -> Result<RecordBatch, ArrowError> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(Arc::new(
datafusion::arrow::datatypes::Schema::empty(),
)));
}

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let (start, end) = page_row_range(page, page_size);

// Clamp end to available rows
let end = end.min(total_rows);

if start >= total_rows {
// Page is beyond available data
return Ok(RecordBatch::new_empty(batches[0].schema()));
}

// Create indices for the rows we want
let indices = UInt32Array::from_iter_values((start as u32)..(end as u32));

// Extract rows from batches
extract_rows_from_batches(batches, &indices)
}

/// Extract specific rows (by global indices) from batches
/// Handles batch boundaries by concatenating only necessary batches
fn extract_rows_from_batches(
batches: &[RecordBatch],
indices: &dyn Array,
) -> Result<RecordBatch, ArrowError> {
match batches.len() {
0 => Ok(RecordBatch::new_empty(Arc::new(
datafusion::arrow::datatypes::Schema::empty(),
))),
1 => take_record_batch(&batches[0], indices),
_ => {
// Multiple batches: concat then extract rows
// Only concat the batches we've loaded (lazy loading ensures minimal concat)
let schema = batches[0].schema();
let concatenated = concat_batches(&schema, batches)?;
take_record_batch(&concatenated, indices)
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_page_row_range() {
assert_eq!(page_row_range(0, 100), (0, 100));
assert_eq!(page_row_range(1, 100), (100, 200));
assert_eq!(page_row_range(2, 50), (100, 150));
}

#[test]
fn test_has_sufficient_rows() {
assert!(has_sufficient_rows(100, 0, 100)); // Exactly enough
assert!(has_sufficient_rows(150, 0, 100)); // More than enough
assert!(!has_sufficient_rows(50, 0, 100)); // Not enough
assert!(!has_sufficient_rows(150, 1, 100)); // Need 200, only have 150
}
}
Loading