Skip to content

Commit 26e2b42

Browse files
authored
fix: retry advancement of PreparedCommit into PostCommit in case version 0 already exists (created by another writer) (delta-io#3513)
# Description This PR fixes the `PreparedCommit` to fallback to the retry mechanism in case version 0 has been created by another writer. # Related Issue(s) Resolve delta-io#3505 Signed-off-by: danielgafni <danielgafni16@gmail.com>
1 parent 81e4b31 commit 26e2b42

2 files changed

Lines changed: 96 additions & 17 deletions

File tree

crates/core/src/kernel/transaction/mod.rs

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -610,25 +610,48 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
610610
Box::pin(async move {
611611
let commit_or_bytes = this.commit_or_bytes;
612612

613-
if this.table_data.is_none() {
613+
let mut attempt_number: usize = 1;
614+
615+
// Handle the case where table doesn't exist yet (initial table creation)
616+
let read_snapshot: EagerSnapshot = if this.table_data.is_none() {
614617
debug!("committing initial table version 0");
615-
this.log_store
618+
match this
619+
.log_store
616620
.write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
617-
.await?;
618-
return Ok(PostCommit {
619-
version: 0,
620-
data: this.data,
621-
create_checkpoint: false,
622-
cleanup_expired_logs: None,
623-
log_store: this.log_store,
624-
table_data: None,
625-
custom_execute_handler: this.post_commit_hook_handler,
626-
metrics: CommitMetrics { num_retries: 0 },
627-
});
628-
}
621+
.await
622+
{
623+
Ok(_) => {
624+
return Ok(PostCommit {
625+
version: 0,
626+
data: this.data,
627+
create_checkpoint: false,
628+
cleanup_expired_logs: None,
629+
log_store: this.log_store,
630+
table_data: None,
631+
custom_execute_handler: this.post_commit_hook_handler,
632+
metrics: CommitMetrics { num_retries: 0 },
633+
})
634+
}
635+
Err(TransactionError::VersionAlreadyExists(0)) => {
636+
// Table was created by another writer since the `this.table_data.is_none()` check.
637+
// Load the current table state and continue with the retry loop.
638+
debug!("version 0 already exists, loading table state for retry");
639+
attempt_number = 2;
640+
let latest_version = this.log_store.get_latest_version(0).await?;
641+
EagerSnapshot::try_new(
642+
this.log_store.as_ref(),
643+
Default::default(),
644+
Some(latest_version),
645+
)
646+
.await?
647+
}
648+
Err(e) => return Err(e.into()),
649+
}
650+
} else {
651+
this.table_data.unwrap().eager_snapshot().clone()
652+
};
629653

630-
// unwrap() is safe here due to the above check
631-
let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();
654+
let mut read_snapshot = read_snapshot;
632655

633656
let commit_span = info_span!(
634657
"commit_with_retries",
@@ -640,7 +663,6 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
640663
);
641664

642665
async move {
643-
let mut attempt_number = 1;
644666
let total_retries = this.max_retries + 1;
645667
while attempt_number <= total_retries {
646668
Span::current().record("attempt", attempt_number);

crates/test/src/concurrent.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,63 @@ pub async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult
1717
Ok(())
1818
}
1919

20+
pub async fn test_concurrent_table_creation(context: &IntegrationContext) -> TestResult {
21+
let table_uri = context.uri_for_table(TestTables::Custom("concurrent_create".into()));
22+
23+
if table_uri.starts_with("file://") {
24+
let path = table_uri.strip_prefix("file://").unwrap();
25+
std::fs::create_dir_all(path)?;
26+
}
27+
28+
let schema = StructType::try_new(vec![StructField::new(
29+
"Id",
30+
DataType::Primitive(PrimitiveType::Integer),
31+
true,
32+
)])?;
33+
34+
const NUM_WRITERS: usize = 5;
35+
36+
// Spawn multiple tasks that all try to create the table simultaneously
37+
let mut futures = Vec::new();
38+
for i in 0..NUM_WRITERS {
39+
let uri = table_uri.clone();
40+
let schema = schema.clone();
41+
futures.push(tokio::spawn(async move {
42+
let table_url = url::Url::parse(&uri).unwrap();
43+
let table = DeltaTableBuilder::from_uri(table_url)
44+
.unwrap()
45+
.with_allow_http(true)
46+
.build()
47+
.unwrap();
48+
49+
// Each writer tries to create the table
50+
let result = DeltaOps(table)
51+
.create()
52+
.with_columns(schema.fields().cloned())
53+
.await;
54+
55+
(i, result)
56+
}));
57+
}
58+
59+
// Collect results - all writers must succeed
60+
let mut versions = Vec::new();
61+
for f in futures {
62+
let (i, result) = f.await.unwrap();
63+
let table = result.unwrap_or_else(|e| panic!("Writer {i} failed: {e}"));
64+
versions.push(table.version());
65+
}
66+
67+
// Exactly one should have version 0
68+
let version_zero_count = versions.iter().filter(|v| **v == Some(0)).count();
69+
assert_eq!(
70+
version_zero_count, 1,
71+
"Exactly one writer should get version 0"
72+
);
73+
74+
Ok(())
75+
}
76+
2077
async fn prepare_table(
2178
context: &IntegrationContext,
2279
) -> Result<(DeltaTable, String), Box<dyn std::error::Error + 'static>> {

0 commit comments

Comments
 (0)