Skip to content

Commit 2748585

Browse files
committed
chore: trailing slashes on the tablePath in S3DynamoDbLogStore are load-bearing
For better or worse, the trailing slashes on the `tablePath` DynamoDB items are semantically important and a trailing slash coming from deltalake-core causes lookup failures with multiple writers between Spark and Rust. This change ensures that the S3DynamoDbLogStore is always removing the trailing slash should it exist. Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 314ecc6 commit 2748585

2 files changed

Lines changed: 60 additions & 27 deletions

File tree

crates/aws/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ chrono = { workspace = true }
5454
deltalake-core = { path = "../core" }
5555
deltalake-test = { path = "../test" }
5656
pretty_env_logger = "0.5.0"
57+
pretty_assertions = "*"
5758
rand = "0.8"
5859
serde_json = { workspace = true }
5960
serial_test = "3"

crates/aws/src/lib.rs

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,15 @@ pub struct DynamoDbLockClient {
151151
config: DynamoDbConfig,
152152
}
153153

154+
#[cfg(test)]
155+
impl Default for DynamoDbLockClient {
156+
fn default() -> Self {
157+
let sdk_config = aws_config::SdkConfig::builder().build();
158+
Self::try_new(&sdk_config, None, None, None, None, None, None, None, None)
159+
.expect("Failed to create a default DynamoDbLockClient for testing purpose")
160+
}
161+
}
162+
154163
impl std::fmt::Debug for DynamoDbLockClient {
155164
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
156165
write!(f, "DynamoDbLockClient(config: {:?})", self.config)
@@ -323,19 +332,6 @@ impl DynamoDbLockClient {
323332
&self.config
324333
}
325334

326-
fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap<String, AttributeValue> {
327-
HashMap::from([
328-
(
329-
constants::ATTR_TABLE_PATH.to_owned(),
330-
string_attr(table_path),
331-
),
332-
(
333-
constants::ATTR_FILE_NAME.to_owned(),
334-
string_attr(format!("{version:020}.json")),
335-
),
336-
])
337-
}
338-
339335
/// Read a log entry from DynamoDb.
340336
pub async fn get_commit_entry(
341337
&self,
@@ -349,7 +345,7 @@ impl DynamoDbLockClient {
349345
.get_item()
350346
.consistent_read(true)
351347
.table_name(&self.config.lock_table_name)
352-
.set_key(Some(self.get_primary_key(version, table_path)))
348+
.set_key(Some(get_primary_key(version, table_path)))
353349
.send()
354350
.await
355351
},
@@ -442,7 +438,9 @@ impl DynamoDbLockClient {
442438
.key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH))
443439
.set_expression_attribute_values(Some(HashMap::from([(
444440
":tn".into(),
445-
string_attr(table_path),
441+
// NOTE: the lack of trailing slashes is a load-bearing implementation
442+
// detail between the Delta/Spark and delta-rs S3DynamoDbLogStore
443+
string_attr(table_path.trim_end_matches('/')),
446444
)])))
447445
.send()
448446
.await
@@ -488,7 +486,7 @@ impl DynamoDbLockClient {
488486
.dynamodb_client
489487
.update_item()
490488
.table_name(self.get_lock_table_name())
491-
.set_key(Some(self.get_primary_key(version, table_path)))
489+
.set_key(Some(get_primary_key(version, table_path)))
492490
.update_expression("SET complete = :c, expireTime = :e".to_owned())
493491
.set_expression_attribute_values(Some(HashMap::from([
494492
(":c".to_owned(), string_attr("true")),
@@ -535,7 +533,7 @@ impl DynamoDbLockClient {
535533
.dynamodb_client
536534
.delete_item()
537535
.table_name(self.get_lock_table_name())
538-
.set_key(Some(self.get_primary_key(version, table_path)))
536+
.set_key(Some(get_primary_key(version, table_path)))
539537
.set_expression_attribute_values(Some(HashMap::from([(
540538
":f".into(),
541539
string_attr("false"),
@@ -631,21 +629,32 @@ fn epoch_to_system_time(s: u64) -> SystemTime {
631629
SystemTime::UNIX_EPOCH + Duration::from_secs(s)
632630
}
633631

634-
fn create_value_map(
635-
commit_entry: &CommitEntry,
636-
table_path: &str,
637-
) -> HashMap<String, AttributeValue> {
638-
// cut off `_delta_log` part: temp_path in DynamoDb is relative to `_delta_log` not table root.
639-
let temp_path = Path::from_iter(commit_entry.temp_path.parts().skip(1));
640-
let mut value_map = HashMap::from([
632+
/// Return the primary key as a [HashMap] for looking up log entries in the DynamoDb table
633+
///
634+
/// The `table_path` needs to be sent into DynamoDB without a trailing slash for the [Url] since
635+
/// that is a load-bearing part of the contract with Delta/Spark's implementation.
636+
fn get_primary_key(version: i64, table_path: &str) -> HashMap<String, AttributeValue> {
637+
HashMap::from([
641638
(
642639
constants::ATTR_TABLE_PATH.to_owned(),
643-
string_attr(table_path),
640+
string_attr(table_path.trim_end_matches('/')),
644641
),
645642
(
646643
constants::ATTR_FILE_NAME.to_owned(),
647-
string_attr(format!("{:020}.json", commit_entry.version)),
644+
string_attr(format!("{version:020}.json")),
648645
),
646+
])
647+
}
648+
649+
fn create_value_map(
650+
commit_entry: &CommitEntry,
651+
table_path: &str,
652+
) -> HashMap<String, AttributeValue> {
653+
// cut off `_delta_log` part: temp_path in DynamoDb is relative to `_delta_log` not table root.
654+
let temp_path = Path::from_iter(commit_entry.temp_path.parts().skip(1));
655+
let mut value_map = get_primary_key(commit_entry.version, table_path);
656+
657+
value_map.extend(HashMap::from([
649658
(constants::ATTR_TEMP_PATH.to_owned(), string_attr(temp_path)),
650659
(
651660
constants::ATTR_COMPLETE.to_owned(),
@@ -655,7 +664,7 @@ fn create_value_map(
655664
"false"
656665
}),
657666
),
658-
]);
667+
]));
659668
commit_entry.expire_time.as_ref().map(|t| {
660669
value_map.insert(
661670
constants::ATTR_EXPIRE_TIME.to_owned(),
@@ -763,6 +772,8 @@ mod tests {
763772
use super::*;
764773
use aws_sdk_sts::config::ProvideCredentials;
765774

775+
use pretty_assertions::assert_eq;
776+
766777
use object_store::memory::InMemory;
767778
use serial_test::serial;
768779

@@ -773,6 +784,27 @@ mod tests {
773784
Ok(())
774785
}
775786

787+
#[test]
788+
fn test_get_primary_key() -> DeltaResult<()> {
789+
let version = 0;
790+
let expected = HashMap::from([
791+
(
792+
constants::ATTR_TABLE_PATH.to_owned(),
793+
// NOTE: the lack of a trailing slash is important for compatibility with the
794+
// Delta/Spark S3DynamoDbLogStore
795+
string_attr("s3://bucket/table"),
796+
),
797+
(
798+
constants::ATTR_FILE_NAME.to_owned(),
799+
string_attr(format!("{version:020}.json")),
800+
),
801+
]);
802+
803+
assert_eq!(expected, get_primary_key(version, "s3://bucket/table"));
804+
assert_eq!(expected, get_primary_key(version, "s3://bucket/table/"));
805+
Ok(())
806+
}
807+
776808
#[test]
777809
fn commit_entry_roundtrip_test() -> Result<(), LockClientError> {
778810
let system_time = SystemTime::UNIX_EPOCH

0 commit comments

Comments
 (0)