FileStream: Open next file in parallel while decoding#5161
Conversation
alamb
left a comment
There was a problem hiding this comment.
Looks good to me @thinkharderdev -- thank you. It would be great to figure out some way to test this PR (mostly to ensure we don't break this behavior in the future). However, i don't have any clever ideas on how to do so.
I went through the logic in detail.
I left some suggestions for comments to clarify the intent, which I think would be valuable but are not necessary.
cc @tustvold
| partition_values, | ||
| } | ||
| } | ||
| None => return Poll::Ready(None), |
There was a problem hiding this comment.
| None => return Poll::Ready(None), | |
| // No more input files | |
| None => return Poll::Ready(None), |
| Ok(reader) => { | ||
| let partition_values = mem::take(partition_values); | ||
|
|
||
| let next = self.next_file().transpose(); |
There was a problem hiding this comment.
| let next = self.next_file().transpose(); | |
| // begin opening next file | |
| let next = self.next_file().transpose(); |
| /// The reader instance | ||
| reader: BoxStream<'static, Result<RecordBatch, ArrowError>>, | ||
| /// A [`FileOpenFuture`] for the next file to be processed | ||
| next: Option<(FileOpenFuture, Vec<ScalarValue>)>, |
There was a problem hiding this comment.
I wonder if we could make it future-proof by potentially prefetching n files instead of 1? I guess in cases where file opening is slower than scanning / processing, this could make a difference (e.g. small files).
There was a problem hiding this comment.
Perhaps a follow on PR could turn this into a stream and use StreamExt::buffered or something
There was a problem hiding this comment.
yeah, that seems like a good idea
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
f8a339d to
fa60853
Compare
|
Let's file a ticket for the "buffer N items at a time" idea and work on it as a follow on PR |
|
Thanks again @thinkharderdev |
|
Benchmark runs are scheduled for baseline = 48732b4 and contender = 816a0f8. 816a0f8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
Added #5209 |
Which issue does this PR close?
Closes #5129
Rationale for this change
File opening is mostly IO (and may involve a bunch of sequential IO) so it can probably be parallelized well with decoding. So we should open the next file in parallel while decoding the current file in
FileStreamWhat changes are included in this PR?
Are these changes tested?
I think this should be covered by existing tests
Are there any user-facing changes?
FileStreamMetrics.time_openingis a slightly different metric now as it won't capture time spent opening but rather time spent opening while also not concurrently decoding.