feat: expose DV metadata and payloads as Arrow streams#4168
feat: expose DV metadata and payloads as Arrow streams#4168ion-elgreco merged 3 commits intodelta-io:mainfrom
Conversation
|
@ethan-tyler thanks for picking this up! I think we can simplify the pr quite abit. For Python we only want to have a recordbatchreadwr with two columns: filepath, selection_vector<list[str]> We only need a log replay as is done in here and then convert the dv_hashmap into a recordbatchreader: |
|
@ion-elgreco - Agreed on simplifying the surface and it was actually my first pass. After implementing and validating, I saw real limitations with a If this goes through DataFusion scan internals, that's an architectural mismatch for Polars and any other non-DF consumers.
IMO for cross engine consumers- a protocol/kernel level DV boundary (or binding owned transform) is a better feature. Since we're already in the internals, I'd rather preserve this core and simplify UX on top:
Work is reusable either way. Happy to split into smaller PRs if scope is the concern. Lmk your thoughts and happy to discuss further. |
I dont really see how this is an issue. Consumers on the Python side receive a recordbatchreader including Polars. The dv hashmap is already materialized so we can actually even return an arrow table. How it's executed in rust should have no influence how Python consumers take in the deletion vectors
The Python binding api surface should be minimal, this custom reader should never exist solely for the Python binding. That's why I'm suggesting to go through the replay_files. |
|
ok that's fair, I still think there’s value in this approach in a different context but I’m aligned. I’ll shelf this and push the simplified version. Thanks for the discussion, appreciate you engaging on this |
e864e0d to
d425cdb
Compare
Expose DeltaTable.deletion_vectors() in Python backed by a new core replay API. - Add DeltaScan::deletion_vectors() with deterministic filepath ordering and named DeletionVectorSelection output. - Reuse shared scan metadata stream setup and document replay/drain semantics. - Build Arrow RecordBatchReader output in Python with filepath URI + selection_vector list[bool], chunked batching, non-null list items, and without_files guard. - Tighten concurrency by cloning table+state under one lock and using one SessionContext state for registration + scan. - Strengthen core/Python tests for exact DV values, determinism, eager snapshot path, URI contract, empty results, and error path. - Replace an internal DV invariant expect() with typed error propagation instead of panic. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
d425cdb to
df987a4
Compare
| ctx = match ctx.error_or() { | ||
| Ok(ctx) => ctx, | ||
| Err(err) => return Poll::Ready(Some(Err(err.into()))), | ||
| }; |
There was a problem hiding this comment.
Why is the same thing done here?
There was a problem hiding this comment.
lol right, missed this in my refactoring. Thanks for the call out, will fix it
| fn deletion_vector_schema() -> Arc<arrow::datatypes::Schema> { | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| Arc::new(Schema::new(vec.Reuses the existing DataFusion replay path via
replay_deletion_vectors(...). Results are deterministic and sorted by filepath.Core changes:
DeletionVectorSelectionstruct,DeltaScan::deletion_vectors(), sharedscan_metadata_stream()helper to avoid drift between scan pathsexpect(...)with typed error propagationPython binding:
cloned_table_and_state()to avoid TOCTOU on table + snapshotwithout_filesguard behaviorRelated Issue(s)
Documentation
cc @ion-elgreco