Skip to content

Commit c033c96

Browse files
authored
feat: expose newest table provider to python (delta-io#4057)
# Description The description of the main changes of your pull request # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent c6398d2 commit c033c96

3 files changed

Lines changed: 22 additions & 5 deletions

File tree

crates/core/src/delta_datafusion/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub use self::session::{
7575
create_session,
7676
};
7777
pub use self::table_provider::next::DeltaScan as DeltaScanNext;
78+
pub use self::table_provider::next::SnapshotWrapper;
7879
pub(crate) use find_files::*;
7980

8081
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";

crates/core/src/delta_datafusion/table_provider/next/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ mod scan;
5454
const FILE_ID_COLUMN_DEFAULT: &str = "__delta_rs_file_id__";
5555

5656
#[derive(Clone, Debug, Serialize, Deserialize)]
57-
pub(crate) enum SnapshotWrapper {
57+
pub enum SnapshotWrapper {
5858
Snapshot(Arc<Snapshot>),
5959
EagerSnapshot(Arc<EagerSnapshot>),
6060
}
@@ -85,7 +85,8 @@ pub struct DeltaScan {
8585
}
8686

8787
impl DeltaScan {
88-
pub(super) fn new(snapshot: SnapshotWrapper, config: DeltaScanConfig) -> Result<Self> {
88+
// create new delta scan
89+
pub fn new(snapshot: SnapshotWrapper, config: DeltaScanConfig) -> Result<Self> {
8990
let scan_schema = config.table_schema(snapshot.table_configuration())?;
9091
let full_schema = if config.retain_file_id() {
9192
let mut fields = scan_schema.fields().to_vec();

python/src/lib.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use deltalake::datafusion::catalog::TableProvider;
2323
use deltalake::datafusion::datasource::provider_as_source;
2424
use deltalake::datafusion::logical_expr::LogicalPlanBuilder;
2525
use deltalake::datafusion::prelude::SessionContext;
26-
use deltalake::delta_datafusion::DeltaCdfTableProvider;
26+
use deltalake::delta_datafusion::{
27+
DeltaCdfTableProvider, DeltaScanConfig, DeltaScanNext, SnapshotWrapper,
28+
};
29+
2730
use deltalake::errors::DeltaTableError;
2831
use deltalake::kernel::scalars::ScalarExt;
2932
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties, TableReference};
@@ -1839,8 +1842,20 @@ impl RawDeltaTable {
18391842
let handle = None;
18401843
let name = CString::new("datafusion_table_provider").unwrap();
18411844

1842-
let table = self.with_table(|t| Ok(Arc::new(t.clone())))?;
1843-
let provider = FFI_TableProvider::new(table, false, handle);
1845+
let table = self.with_table(|t| Ok(t.clone()))?;
1846+
1847+
let config = DeltaScanConfig::new();
1848+
let snapshot_wrapped = SnapshotWrapper::EagerSnapshot(Arc::new(
1849+
table
1850+
.snapshot()
1851+
.map_err(PythonError::from)?
1852+
.snapshot()
1853+
.clone(),
1854+
));
1855+
let scan =
1856+
Arc::new(DeltaScanNext::new(snapshot_wrapped, config).map_err(PythonError::from)?)
1857+
as Arc<dyn TableProvider>;
1858+
let provider = FFI_TableProvider::new(scan, false, handle);
18441859

18451860
PyCapsule::new(py, provider, Some(name.clone()))
18461861
}

0 commit comments

Comments
 (0)