Skip to content

Commit 0943fe2

Browse files
byulyethan-tyler
andauthored
feat: add post_commithook_properties to restore (delta-io#4336)
# Description - replacing manual commit pipeline (into_prepared_commit_future + manual write_commit_entry) in restore.rs into standard pipeline, all 3 stages run automatically like other endpoints - exposed `post_commithook_properties` in the Python binding, public API, and type stub - added tests verifying the parameter is accepted with post commimt hook # Related Issue(s) <!--- For example: - closes #106 ---> closes delta-io#4251 <!--- Share links to useful documentation ---> --------- Signed-off-by: Byeori Kim <bk.byeori.kim@gmail.com> Co-authored-by: Ethan Urbanski <ethanurbanski@gmail.com>
1 parent c8f9184 commit 0943fe2

5 files changed

Lines changed: 62 additions & 33 deletions

File tree

crates/core/src/operations/restore.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use serde::Serialize;
3535
use uuid::Uuid;
3636

3737
use super::{CustomExecuteHandler, Operation};
38-
use crate::kernel::transaction::{CommitBuilder, CommitProperties, TransactionError};
38+
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
3939
use crate::kernel::{
4040
Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove, Version, resolve_snapshot,
4141
};
@@ -166,8 +166,9 @@ async fn execute(
166166
ignore_missing_files: bool,
167167
protocol_downgrade_allowed: bool,
168168
mut commit_properties: CommitProperties,
169+
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
169170
operation_id: Uuid,
170-
) -> DeltaResult<RestoreMetrics> {
171+
) -> DeltaResult<(RestoreMetrics, DeltaTableState)> {
171172
if !(version_to_restore
172173
.is_none()
173174
.bitxor(datetime_to_restore.is_none()))
@@ -294,30 +295,15 @@ async fn execute(
294295
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
295296
};
296297

297-
let prepared_commit = CommitBuilder::from(commit_properties)
298+
let commit = CommitBuilder::from(commit_properties)
298299
.with_actions(actions)
300+
.with_max_retries(0)
301+
.with_operation_id(operation_id)
302+
.with_post_commit_hook_handler(custom_execute_handler)
299303
.build(Some(&snapshot), log_store.clone(), operation)
300-
.into_prepared_commit_future()
301304
.await?;
302305

303-
let commit_version = snapshot.version() + 1;
304-
let commit_bytes = prepared_commit.commit_or_bytes();
305-
match log_store
306-
.write_commit_entry(commit_version, commit_bytes.clone(), operation_id)
307-
.await
308-
{
309-
Ok(_) => {}
310-
Err(err @ TransactionError::VersionAlreadyExists(_)) => {
311-
return Err(err.into());
312-
}
313-
Err(err) => {
314-
log_store
315-
.abort_commit_entry(commit_version, commit_bytes.clone(), operation_id)
316-
.await?;
317-
return Err(err.into());
318-
}
319-
}
320-
Ok(metrics)
306+
Ok((metrics, commit.snapshot()))
321307
}
322308

323309
async fn check_files_available(
@@ -344,7 +330,7 @@ impl std::future::IntoFuture for RestoreBuilder {
344330
type IntoFuture = BoxFuture<'static, Self::Output>;
345331

346332
fn into_future(self) -> Self::IntoFuture {
347-
let this = self;
333+
let mut this = self;
348334

349335
Box::pin(async move {
350336
let snapshot =
@@ -353,24 +339,28 @@ impl std::future::IntoFuture for RestoreBuilder {
353339
let operation_id = this.get_operation_id();
354340
this.pre_execute(operation_id).await?;
355341

356-
let metrics = execute(
342+
let handle = this.custom_execute_handler.take();
343+
let (metrics, new_state) = execute(
357344
this.log_store.clone(),
358-
snapshot.clone(),
345+
snapshot,
359346
this.version_to_restore,
360347
this.datetime_to_restore,
361348
this.ignore_missing_files,
362349
this.protocol_downgrade_allowed,
363350
this.commit_properties.clone(),
351+
handle.clone(),
364352
operation_id,
365353
)
366354
.await?;
367355

368-
this.post_execute(operation_id).await?;
356+
if let Some(handler) = handle {
357+
handler.post_execute(&this.log_store, operation_id).await?;
358+
}
369359

370-
let mut table =
371-
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot));
372-
table.update_state().await?;
373-
Ok((table, metrics))
360+
Ok((
361+
DeltaTable::new_with_state(this.log_store, new_state),
362+
metrics,
363+
))
374364
})
375365
}
376366
}

python/deltalake/_internal.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class RawDeltaTable:
189189
ignore_missing_files: bool,
190190
protocol_downgrade_allowed: bool,
191191
commit_properties: CommitProperties | None,
192+
post_commithook_properties: PostCommitHookProperties | None,
192193
) -> str: ...
193194
def history(self, limit: int | None) -> list[str]: ...
194195
def update_incremental(self) -> None: ...

python/deltalake/table.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ def restore(
892892
ignore_missing_files: bool = False,
893893
protocol_downgrade_allowed: bool = False,
894894
commit_properties: CommitProperties | None = None,
895+
post_commithook_properties: PostCommitHookProperties | None = None,
895896
) -> dict[str, Any]:
896897
"""
897898
Restores table to a given version or datetime.
@@ -901,6 +902,7 @@ def restore(
901902
ignore_missing_files: whether the operation carry on when some data files missing.
902903
protocol_downgrade_allowed: whether the operation when protocol version upgraded.
903904
commit_properties: properties of the transaction commit. If None, default values are used.
905+
post_commithook_properties: properties for the post commit hook. If None, default values are used.
904906
905907
Returns:
906908
the metrics from restore.
@@ -911,13 +913,15 @@ def restore(
911913
ignore_missing_files=ignore_missing_files,
912914
protocol_downgrade_allowed=protocol_downgrade_allowed,
913915
commit_properties=commit_properties,
916+
post_commithook_properties=post_commithook_properties,
914917
)
915918
else:
916919
metrics = self._table.restore(
917920
target,
918921
ignore_missing_files=ignore_missing_files,
919922
protocol_downgrade_allowed=protocol_downgrade_allowed,
920923
commit_properties=commit_properties,
924+
post_commithook_properties=post_commithook_properties,
921925
)
922926
return json.loads(metrics)
923927

python/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,14 +1241,15 @@ impl RawDeltaTable {
12411241

12421242
// Run the restore command on the Delta Table: restore table to a given version or datetime
12431243
#[pyo3(signature = (
1244-
target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None
1244+
target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None, post_commithook_properties=None
12451245
))]
12461246
pub fn restore(
12471247
&self,
12481248
target: Option<&Bound<'_, PyAny>>,
12491249
ignore_missing_files: bool,
12501250
protocol_downgrade_allowed: bool,
12511251
commit_properties: Option<PyCommitProperties>,
1252+
post_commithook_properties: Option<PyPostCommitHookProperties>,
12521253
) -> PyResult<String> {
12531254
let table = self._table.lock().map_err(to_rt_err)?.clone();
12541255
let mut cmd = table.restore();
@@ -1268,7 +1269,9 @@ impl RawDeltaTable {
12681269
cmd = cmd.with_ignore_missing_files(ignore_missing_files);
12691270
cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed);
12701271

1271-
if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, None) {
1272+
if let Some(commit_properties) =
1273+
maybe_create_commit_properties(commit_properties, post_commithook_properties)
1274+
{
12721275
cmd = cmd.with_commit_properties(commit_properties);
12731276
}
12741277

python/tests/test_restore.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
import pytest
55
from arro3.core import Table
66

7-
from deltalake import CommitProperties, DeltaTable, write_deltalake
7+
from deltalake import (
8+
CommitProperties,
9+
DeltaTable,
10+
PostCommitHookProperties,
11+
write_deltalake,
12+
)
813

914

1015
@pytest.mark.parametrize("use_relative", [True, False])
@@ -88,3 +93,29 @@ def test_restore_with_datetime(
8893
last_action = dt.history(1)[0]
8994
assert last_action["operation"] == "RESTORE"
9095
assert dt.version() == old_version + 1
96+
97+
98+
def test_restore_with_post_commithook_properties(
99+
tmp_path: pathlib.Path,
100+
sample_table: Table,
101+
):
102+
write_deltalake(str(tmp_path), sample_table, mode="append")
103+
write_deltalake(str(tmp_path), sample_table, mode="append")
104+
write_deltalake(str(tmp_path), sample_table, mode="append")
105+
106+
dt = DeltaTable(str(tmp_path))
107+
old_version = dt.version()
108+
dt.restore(
109+
1,
110+
post_commithook_properties=PostCommitHookProperties(
111+
create_checkpoint=False,
112+
cleanup_expired_logs=False,
113+
),
114+
)
115+
last_action = dt.history(1)[0]
116+
assert last_action["operation"] == "RESTORE"
117+
assert dt.version() == old_version + 1
118+
119+
log_dir = tmp_path / "_delta_log"
120+
checkpoint_files = list(log_dir.glob("*.checkpoint.parquet"))
121+
assert len(checkpoint_files) == 0

0 commit comments

Comments
 (0)