Skip to content

Commit 028e351

Browse files
adriangbclaude
andauthored
Add files_processed and files_scanned metrics to FileStreamMetrics (#20592)
## Summary - Add `files_processed` counter to `FileStreamMetrics`, incremented for every file assigned to the partition — whether it was opened, pruned (returned an empty stream), or skipped due to a LIMIT. When the stream completes, this equals the total number of files in the partition. - Add `files_opened` counter to `FileStreamMetrics`, incremented as soon as we consider a file for processing (either actually opened, discarded because of a LIMIT or stats, etc.). ## Motivation These metrics enable **tracking query progress** during long-running scans. Today, there is no way to monitor how far along a file scan is. The existing `FileStreamMetrics` only provide: - **Timing metrics** (`time_elapsed_opening`, `time_elapsed_scanning_total`, etc.) — these measure duration but don't indicate progress. You can't tell whether a scan is 10% or 90% done from elapsed time alone. - **Error counters** (`file_open_errors`, `file_scan_errors`) — these only count failures, not successful progress. - **`output_rows`** or **`bytes_scanned`** (from `BaselineMetrics`) — counts rows emitted, but since we don't know upfront how many rows will be emitted in total this is a poor metric, i.e. it never converges to 100% if there are filters, etc. In contrast, `files_processed` and `files_opened` combined with the known number of files in `file_groups` give a clear progress indicator: `files_processed / total_files`. This is the most natural and reliable way to track scan progress since the file count is known at plan time. Depending on what users plan to do with the metric they can pick `files_opened / total_files` (leading metric) or `files_processed / total_files` (lagging metric). ## Test plan - [x] Existing `file_stream` tests pass (8/8) - [x] `cargo check -p datafusion-datasource` compiles cleanly 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 909608a commit 028e351

1 file changed

Lines changed: 54 additions & 16 deletions

File tree

datafusion/datasource/src/file_stream.rs

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ impl FileStream {
124124
}
125125
FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
126126
Ok(reader) => {
127+
self.file_stream_metrics.files_opened.add(1);
127128
// include time needed to start opening in `start_next_file`
128129
self.file_stream_metrics.time_opening.stop();
129130
let next = self.start_next_file().transpose();
@@ -150,6 +151,7 @@ impl FileStream {
150151
self.file_stream_metrics.file_open_errors.add(1);
151152
match self.on_error {
152153
OnError::Skip => {
154+
self.file_stream_metrics.files_processed.add(1);
153155
self.file_stream_metrics.time_opening.stop();
154156
self.state = FileStreamState::Idle
155157
}
@@ -179,6 +181,15 @@ impl FileStream {
179181
batch
180182
} else {
181183
let batch = batch.slice(0, *remain);
184+
// Count this file, the prefetched next file
185+
// (if any), and all remaining files we will
186+
// never open.
187+
let done = 1
188+
+ self.file_iter.len()
189+
+ usize::from(next.is_some());
190+
self.file_stream_metrics
191+
.files_processed
192+
.add(done);
182193
self.state = FileStreamState::Limit;
183194
*remain = 0;
184195
batch
@@ -196,33 +207,37 @@ impl FileStream {
196207

197208
match self.on_error {
198209
// If `OnError::Skip` we skip the file as soon as we hit the first error
199-
OnError::Skip => match mem::take(next) {
200-
Some(future) => {
201-
self.file_stream_metrics.time_opening.start();
202-
203-
match future {
204-
NextOpen::Pending(future) => {
205-
self.state =
206-
FileStreamState::Open { future }
207-
}
208-
NextOpen::Ready(reader) => {
209-
self.state = FileStreamState::Open {
210-
future: Box::pin(std::future::ready(
211-
reader,
212-
)),
210+
OnError::Skip => {
211+
self.file_stream_metrics.files_processed.add(1);
212+
match mem::take(next) {
213+
Some(future) => {
214+
self.file_stream_metrics.time_opening.start();
215+
216+
match future {
217+
NextOpen::Pending(future) => {
218+
self.state =
219+
FileStreamState::Open { future }
220+
}
221+
NextOpen::Ready(reader) => {
222+
self.state = FileStreamState::Open {
223+
future: Box::pin(
224+
std::future::ready(reader),
225+
),
226+
}
213227
}
214228
}
215229
}
230+
None => return Poll::Ready(None),
216231
}
217-
None => return Poll::Ready(None),
218-
},
232+
}
219233
OnError::Fail => {
220234
self.state = FileStreamState::Error;
221235
return Poll::Ready(Some(Err(err)));
222236
}
223237
}
224238
}
225239
None => {
240+
self.file_stream_metrics.files_processed.add(1);
226241
self.file_stream_metrics.time_scanning_until_data.stop();
227242
self.file_stream_metrics.time_scanning_total.stop();
228243

@@ -399,6 +414,22 @@ pub struct FileStreamMetrics {
399414
/// If using `OnError::Skip` this will provide a count of the number of files
400415
/// which were skipped and will not be included in the scan results.
401416
pub file_scan_errors: Count,
417+
/// Count of files successfully opened or evaluated for processing.
418+
/// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal
419+
/// to the total number of files in the query; unless the query itself fails.
420+
/// This value will always be greater than or equal to `files_open`.
421+
/// Note that this value does *not* mean the file was actually scanned.
422+
/// We increment this value for any processing of a file, even if that processing is
423+
/// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time).
424+
pub files_opened: Count,
425+
/// Count of files completely processed / closed (opened, pruned, or skipped due to limit).
426+
/// At t=0 (the beginning of a query) this is 0.
427+
/// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal
428+
/// to the total number of files in the query; unless the query itself fails.
429+
/// This value will always be less than or equal to `files_open`.
430+
/// We increment this value for any processing of a file, even if that processing is
431+
/// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time).
432+
pub files_processed: Count,
402433
}
403434

404435
impl FileStreamMetrics {
@@ -433,13 +464,20 @@ impl FileStreamMetrics {
433464
let file_scan_errors =
434465
MetricBuilder::new(metrics).counter("file_scan_errors", partition);
435466

467+
let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition);
468+
469+
let files_processed =
470+
MetricBuilder::new(metrics).counter("files_processed", partition);
471+
436472
Self {
437473
time_opening,
438474
time_scanning_until_data,
439475
time_scanning_total,
440476
time_processing,
441477
file_open_errors,
442478
file_scan_errors,
479+
files_opened,
480+
files_processed,
443481
}
444482
}
445483
}

0 commit comments

Comments
 (0)