Skip to content

Commit 260c872

Browse files
authored
chore: remove deprecated pyo3 methods (delta-io#3975)
# Description The description of the main changes of your pull request # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent e1bc81d commit 260c872

5 files changed

Lines changed: 36 additions & 35 deletions

File tree

crates/core/src/operations/filesystem_check.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub struct FileSystemCheckBuilder {
5757
pub struct FileSystemCheckMetrics {
5858
/// Was this a dry run
5959
pub dry_run: bool,
60-
/// Files that wrere removed successfully
60+
/// Files that where removed successfully
6161
#[serde(
6262
serialize_with = "serialize_vec_string",
6363
deserialize_with = "deserialize_vec_string"

crates/core/src/writer/stats.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ fn stats_from_metadata(
188188
.flat_map(|g| {
189189
g.column(idx).statistics().into_iter().filter_map(|s| {
190190
let is_binary = matches!(&column_descr.physical_type(), Type::BYTE_ARRAY)
191-
&& matches!(column_descr.logical_type(), Some(LogicalType::String)).not();
191+
&& matches!(column_descr.logical_type_ref(), Some(LogicalType::String))
192+
.not();
192193
if is_binary {
193194
warn!(
194195
"Skipping column {} because it's a binary field.",

python/src/filesystem.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ impl DeltaFileSystemHandler {
144144
let mut infos = Vec::with_capacity(paths.len());
145145
for file_path in paths {
146146
let path = Self::parse_path(&file_path);
147-
let listed = py.allow_threads(|| {
147+
let listed = py.detach(|| {
148148
rt().block_on(self.inner.list_with_delimiter(Some(&path)))
149149
.map_err(PythonError::from)
150150
})?;
151151

152152
// TODO is there a better way to figure out if we are in a directory?
153153
if listed.objects.is_empty() && listed.common_prefixes.is_empty() {
154-
let maybe_meta = py.allow_threads(|| rt().block_on(self.inner.head(&path)));
154+
let maybe_meta = py.detach(|| rt().block_on(self.inner.head(&path)));
155155
match maybe_meta {
156156
Ok(meta) => {
157157
let kwargs = HashMap::from([
@@ -480,7 +480,7 @@ impl ObjectInputFile {
480480
let nbytes = (range.end - range.start) as i64;
481481
self.pos += nbytes;
482482
let data = if nbytes > 0 {
483-
py.allow_threads(|| {
483+
py.detach(|| {
484484
rt().block_on(self.store.get_range(&self.path, range))
485485
.map_err(PythonError::from)
486486
})?
@@ -576,7 +576,7 @@ impl ObjectOutputStream {
576576
#[pymethods]
577577
impl ObjectOutputStream {
578578
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
579-
py.allow_threads(|| {
579+
py.detach(|| {
580580
self.closed = true;
581581
if !self.buffer.is_empty() {
582582
self.upload_buffer()?;
@@ -632,7 +632,7 @@ impl ObjectOutputStream {
632632
self.check_closed()?;
633633
let py = data.py();
634634
let bytes = data.as_bytes();
635-
py.allow_threads(|| {
635+
py.detach(|| {
636636
let len = bytes.len();
637637
for chunk in bytes.chunks(self.max_buffer_size) {
638638
// this will never overflow
@@ -658,7 +658,7 @@ impl ObjectOutputStream {
658658
}
659659

660660
fn flush(&mut self, py: Python<'_>) -> PyResult<()> {
661-
py.allow_threads(|| self.upload_buffer())
661+
py.detach(|| self.upload_buffer())
662662
}
663663

664664
fn fileno(&self) -> PyResult<()> {

python/src/lib.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ impl RawDeltaTable {
171171
without_files: bool,
172172
log_buffer_size: Option<usize>,
173173
) -> PyResult<Self> {
174-
py.allow_threads(|| {
174+
py.detach(|| {
175175
let table_url = deltalake::table::builder::parse_table_uri(table_uri)
176176
.map_err(error::PythonError::from)?;
177177
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_url)
@@ -301,7 +301,7 @@ impl RawDeltaTable {
301301
///
302302
/// This will acquire the internal lock since it is a mutating operation!
303303
pub fn load_version(&self, py: Python, version: i64) -> PyResult<()> {
304-
py.allow_threads(|| {
304+
py.detach(|| {
305305
#[allow(clippy::await_holding_lock)]
306306
rt().block_on(async {
307307
let mut table = self
@@ -319,7 +319,7 @@ impl RawDeltaTable {
319319

320320
/// Retrieve the latest version from the internally loaded table state
321321
pub fn get_latest_version(&self, py: Python) -> PyResult<i64> {
322-
py.allow_threads(|| {
322+
py.detach(|| {
323323
#[allow(clippy::await_holding_lock)]
324324
rt().block_on(async {
325325
match self._table.lock() {
@@ -360,7 +360,7 @@ impl RawDeltaTable {
360360
}
361361

362362
pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult<()> {
363-
py.allow_threads(|| {
363+
py.detach(|| {
364364
let datetime =
365365
DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(
366366
|err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")),
@@ -389,7 +389,7 @@ impl RawDeltaTable {
389389
if !self.has_files()? {
390390
return Err(DeltaError::new_err("Table is instantiated without files."));
391391
}
392-
py.allow_threads(|| {
392+
py.detach(|| {
393393
if let Some(filters) = partition_filters {
394394
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
395395
Ok(self
@@ -482,7 +482,7 @@ impl RawDeltaTable {
482482
full: bool,
483483
keep_versions: Option<Vec<i64>>,
484484
) -> PyResult<Vec<String>> {
485-
let (table, metrics) = py.allow_threads(|| {
485+
let (table, metrics) = py.detach(|| {
486486
let table = self._table.lock().map_err(to_rt_err)?.clone();
487487
let mut cmd = DeltaOps(table)
488488
.vacuum()
@@ -539,7 +539,7 @@ impl RawDeltaTable {
539539
commit_properties: Option<PyCommitProperties>,
540540
post_commithook_properties: Option<PyPostCommitHookProperties>,
541541
) -> PyResult<String> {
542-
let (table, metrics) = py.allow_threads(|| {
542+
let (table, metrics) = py.detach(|| {
543543
let table = self._table.lock().map_err(to_rt_err)?.clone();
544544
let mut cmd = DeltaOps(table).update().with_safe_cast(safe_cast);
545545

@@ -598,7 +598,7 @@ impl RawDeltaTable {
598598
commit_properties: Option<PyCommitProperties>,
599599
post_commithook_properties: Option<PyPostCommitHookProperties>,
600600
) -> PyResult<String> {
601-
let (table, metrics) = py.allow_threads(|| {
601+
let (table, metrics) = py.detach(|| {
602602
let table = self._table.lock().map_err(to_rt_err)?.clone();
603603
let mut cmd = DeltaOps(table)
604604
.optimize()
@@ -665,7 +665,7 @@ impl RawDeltaTable {
665665
commit_properties: Option<PyCommitProperties>,
666666
post_commithook_properties: Option<PyPostCommitHookProperties>,
667667
) -> PyResult<String> {
668-
let (table, metrics) = py.allow_threads(|| {
668+
let (table, metrics) = py.detach(|| {
669669
let table = self._table.lock().map_err(to_rt_err)?.clone();
670670
let mut cmd = DeltaOps(table.clone())
671671
.optimize()
@@ -717,7 +717,7 @@ impl RawDeltaTable {
717717
commit_properties: Option<PyCommitProperties>,
718718
post_commithook_properties: Option<PyPostCommitHookProperties>,
719719
) -> PyResult<()> {
720-
let table = py.allow_threads(|| {
720+
let table = py.detach(|| {
721721
let table = self._table.lock().map_err(to_rt_err)?.clone();
722722
let mut cmd = DeltaOps(table).add_columns();
723723

@@ -757,7 +757,7 @@ impl RawDeltaTable {
757757
commit_properties: Option<PyCommitProperties>,
758758
post_commithook_properties: Option<PyPostCommitHookProperties>,
759759
) -> PyResult<()> {
760-
let table = py.allow_threads(|| {
760+
let table = py.detach(|| {
761761
let table = self._table.lock().map_err(to_rt_err)?.clone();
762762
let mut cmd = DeltaOps(table)
763763
.add_feature()
@@ -790,7 +790,7 @@ impl RawDeltaTable {
790790
commit_properties: Option<PyCommitProperties>,
791791
post_commithook_properties: Option<PyPostCommitHookProperties>,
792792
) -> PyResult<()> {
793-
let table = py.allow_threads(|| {
793+
let table = py.detach(|| {
794794
let table = self._table.lock().map_err(to_rt_err)?.clone();
795795
let mut cmd = DeltaOps(table).add_constraint();
796796

@@ -823,7 +823,7 @@ impl RawDeltaTable {
823823
commit_properties: Option<PyCommitProperties>,
824824
post_commithook_properties: Option<PyPostCommitHookProperties>,
825825
) -> PyResult<()> {
826-
let table = py.allow_threads(|| {
826+
let table = py.detach(|| {
827827
let table = self._table.lock().map_err(to_rt_err)?.clone();
828828
let mut cmd = DeltaOps(table)
829829
.drop_constraints()
@@ -927,7 +927,7 @@ impl RawDeltaTable {
927927
.block_on(async { ctx.sql(&sql).await?.execute_stream().await })
928928
.map_err(PythonError::from)?;
929929

930-
py.allow_threads(|| {
930+
py.detach(|| {
931931
let stream = convert_stream_to_reader(stream);
932932
Ok(stream.into())
933933
})
@@ -962,7 +962,7 @@ impl RawDeltaTable {
962962
post_commithook_properties: Option<PyPostCommitHookProperties>,
963963
commit_properties: Option<PyCommitProperties>,
964964
) -> PyResult<PyMergeBuilder> {
965-
py.allow_threads(|| {
965+
py.detach(|| {
966966
let handler: Option<Arc<dyn CustomExecuteHandler>> =
967967
if self.log_store()?.name() == "LakeFSLogStore" {
968968
Some(Arc::new(LakeFSCustomExecuteHandler {}))
@@ -998,7 +998,7 @@ impl RawDeltaTable {
998998
py: Python,
999999
merge_builder: &mut PyMergeBuilder,
10001000
) -> PyResult<String> {
1001-
py.allow_threads(|| {
1001+
py.detach(|| {
10021002
let (table, metrics) = merge_builder.execute().map_err(PythonError::from)?;
10031003
self.set_state(table.state)?;
10041004
Ok(metrics)
@@ -1263,7 +1263,7 @@ impl RawDeltaTable {
12631263
post_commithook_properties: Option<PyPostCommitHookProperties>,
12641264
) -> PyResult<()> {
12651265
let schema = schema.as_ref().inner_type.clone();
1266-
py.allow_threads(|| {
1266+
py.detach(|| {
12671267
let mode = mode.parse().map_err(PythonError::from)?;
12681268

12691269
let existing_schema = self.with_table(|t| {
@@ -1358,7 +1358,7 @@ impl RawDeltaTable {
13581358
}
13591359

13601360
pub fn create_checkpoint(&self, py: Python) -> PyResult<()> {
1361-
py.allow_threads(|| {
1361+
py.detach(|| {
13621362
let operation_id = Uuid::new_v4();
13631363
let handle = Arc::new(LakeFSCustomExecuteHandler {});
13641364
let store = &self.log_store()?;
@@ -1401,7 +1401,7 @@ impl RawDeltaTable {
14011401
}
14021402

14031403
pub fn cleanup_metadata(&self, py: Python) -> PyResult<()> {
1404-
let (_result, new_state) = py.allow_threads(|| {
1404+
let (_result, new_state) = py.detach(|| {
14051405
let operation_id = Uuid::new_v4();
14061406
let handle = Arc::new(LakeFSCustomExecuteHandler {});
14071407
let store = &self.log_store()?;
@@ -1508,7 +1508,7 @@ impl RawDeltaTable {
15081508
commit_properties: Option<PyCommitProperties>,
15091509
post_commithook_properties: Option<PyPostCommitHookProperties>,
15101510
) -> PyResult<String> {
1511-
let (table, metrics) = py.allow_threads(|| {
1511+
let (table, metrics) = py.detach(|| {
15121512
let table = self._table.lock().map_err(to_rt_err)?.clone();
15131513
let mut cmd = DeltaOps(table).delete();
15141514
if let Some(predicate) = predicate {
@@ -1671,7 +1671,7 @@ impl RawDeltaTable {
16711671
commit_properties: Option<PyCommitProperties>,
16721672
post_commithook_properties: Option<PyPostCommitHookProperties>,
16731673
) -> PyResult<()> {
1674-
let table = py.allow_threads(|| {
1674+
let table = py.detach(|| {
16751675
let table = self._table.lock().map_err(to_rt_err)?.clone();
16761676
let mut cmd = DeltaOps(table)
16771677
.update_field_metadata()
@@ -1734,7 +1734,7 @@ impl RawDeltaTable {
17341734
commit_properties: Option<PyCommitProperties>,
17351735
post_commithook_properties: Option<PyPostCommitHookProperties>,
17361736
) -> PyResult<()> {
1737-
let table = py.allow_threads(|| {
1737+
let table = py.detach(|| {
17381738
let save_mode = mode.parse().map_err(PythonError::from)?;
17391739

17401740
let mut builder = WriteBuilder::new(
@@ -2381,7 +2381,7 @@ fn write_to_deltalake(
23812381
commit_properties: Option<PyCommitProperties>,
23822382
post_commithook_properties: Option<PyPostCommitHookProperties>,
23832383
) -> PyResult<()> {
2384-
let raw_table: DeltaResult<RawDeltaTable> = py.allow_threads(|| {
2384+
let raw_table: DeltaResult<RawDeltaTable> = py.detach(|| {
23852385
let options = storage_options.clone().unwrap_or_default();
23862386
let table_url = deltalake::table::builder::ensure_table_uri(&table_uri)?;
23872387
let table = rt()
@@ -2449,7 +2449,7 @@ fn create_deltalake(
24492449
post_commithook_properties: Option<PyPostCommitHookProperties>,
24502450
) -> PyResult<()> {
24512451
let schema = schema.as_ref().inner_type.clone();
2452-
py.allow_threads(|| {
2452+
py.detach(|| {
24532453
let table_url =
24542454
deltalake::table::builder::ensure_table_uri(&table_uri).map_err(PythonError::from)?;
24552455
let table = DeltaTableBuilder::from_uri(table_url)
@@ -2529,7 +2529,7 @@ fn create_table_with_add_actions(
25292529
) -> PyResult<()> {
25302530
let schema = schema.as_ref().inner_type.clone();
25312531

2532-
py.allow_threads(|| {
2532+
py.detach(|| {
25332533
let table_url =
25342534
deltalake::table::builder::ensure_table_uri(&table_uri).map_err(PythonError::from)?;
25352535
let table = DeltaTableBuilder::from_uri(table_url)
@@ -2603,7 +2603,7 @@ fn convert_to_deltalake(
26032603
) -> PyResult<()> {
26042604
let partition_schema = partition_schema.map(|s| s.as_ref().inner_type.clone());
26052605
let table_url = deltalake::table::builder::ensure_table_uri(&uri).map_err(PythonError::from)?;
2606-
py.allow_threads(|| {
2606+
py.detach(|| {
26072607
let mut builder = ConvertToDeltaBuilder::new().with_location(table_url);
26082608

26092609
if let Some(part_schema) = partition_schema {

python/src/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl PyQueryBuilder {
6363
/// instances, it may result unexpected memory consumption for queries which return large data
6464
/// sets.
6565
pub fn execute(&self, py: Python, sql: &str) -> PyResult<PyRecordBatchReader> {
66-
let stream = py.allow_threads(|| {
66+
let stream = py.detach(|| {
6767
rt().block_on(async {
6868
let df = self.ctx.sql(sql).await?;
6969
df.execute_stream().await

0 commit comments

Comments
 (0)