Skip to content

Commit b058e86

Browse files
committed
chore: pr feedback and test fixes
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
1 parent 435cc81 commit b058e86

4 files changed

Lines changed: 41 additions & 23 deletions

File tree

crates/aws/src/logstore/default_logstore.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub fn default_s3_logstore(
3131
/// Default [`LogStore`] implementation
3232
#[derive(Debug, Clone)]
3333
pub struct S3LogStore {
34-
storage: ObjectStoreRef,
34+
prefixed_store: ObjectStoreRef,
3535
root_store: ObjectStoreRef,
3636
config: LogStoreConfig,
3737
}
@@ -41,15 +41,18 @@ impl S3LogStore {
4141
///
4242
/// # Arguments
4343
///
44-
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
44+
/// * `prefixed_store` - A shared reference to an [`object_store::ObjectStore`]
45+
/// with "/" pointing at delta table root (i.e. where `_delta_log` is located).
46+
/// * `root_store` - A shared reference to an [`object_store::ObjectStore`] with "/"
47+
/// pointing at root of the storage system.
4548
/// * `location` - A url corresponding to the storage location of `storage`.
4649
pub fn new(
47-
storage: ObjectStoreRef,
50+
prefixed_store: ObjectStoreRef,
4851
root_store: ObjectStoreRef,
4952
config: LogStoreConfig,
5053
) -> Self {
5154
Self {
52-
storage,
55+
prefixed_store,
5356
root_store,
5457
config,
5558
}
@@ -120,7 +123,7 @@ impl LogStore for S3LogStore {
120123
}
121124

122125
fn object_store(&self, _operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
123-
self.storage.clone()
126+
self.prefixed_store.clone()
124127
}
125128

126129
fn root_object_store(&self, _operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {

crates/aws/tests/integration_s3_dynamodb.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ async fn test_repair_commit_entry() -> TestResult<()> {
165165
ensure_table_uri(table.table_uri())?,
166166
&options,
167167
&S3_OPTIONS,
168-
std::sync::Arc::new(table.object_store()),
168+
table.log_store().object_store(None),
169+
table.log_store().root_object_store(None),
169170
)?;
170171

171172
// create an incomplete log entry, commit file not yet moved from its temporary location
@@ -240,7 +241,8 @@ async fn test_abort_commit_entry() -> TestResult<()> {
240241
ensure_table_uri(table.table_uri())?,
241242
&options,
242243
&S3_OPTIONS,
243-
std::sync::Arc::new(table.object_store()),
244+
table.log_store().object_store(None),
245+
table.log_store().root_object_store(None),
244246
)?;
245247

246248
let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;
@@ -287,7 +289,8 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
287289
ensure_table_uri(table.table_uri())?,
288290
&options,
289291
&S3_OPTIONS,
290-
std::sync::Arc::new(table.object_store()),
292+
table.log_store().object_store(None),
293+
table.log_store().root_object_store(None),
291294
)?;
292295

293296
let entry = create_incomplete_commit_entry(&table, 1, "finished_commit").await?;

crates/lakefs/src/execute.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use async_trait::async_trait;
22
use deltalake_core::{
3-
logstore::LogStoreRef, operations::CustomExecuteHandler, DeltaResult, DeltaTableError,
3+
logstore::{LogStore as _, LogStoreRef},
4+
operations::CustomExecuteHandler,
5+
DeltaResult, DeltaTableError,
46
};
57
use tracing::debug;
68
use uuid::Uuid;
@@ -28,7 +30,7 @@ impl CustomExecuteHandler for LakeFSCustomExecuteHandler {
2830
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>() {
2931
let (repo, _, _) = lakefs_store
3032
.client
31-
.decompose_url(lakefs_store.config.location.to_string());
33+
.decompose_url(lakefs_store.config().location.to_string());
3234
let result = lakefs_store
3335
.client
3436
.delete_branch(repo, lakefs_store.client.get_transaction(operation_id)?)
@@ -144,7 +146,7 @@ mod tests {
144146
.downcast_ref::<LakeFSLogStore>()
145147
{
146148
assert!(lakefs_store
147-
.storage
149+
.prefixed_registry
148150
.get_store(
149151
&Url::parse(format!("lakefs://repo/delta-tx-{operation_id}/table").as_str())
150152
.unwrap()
@@ -185,7 +187,7 @@ mod tests {
185187
.downcast_ref::<LakeFSLogStore>()
186188
{
187189
assert!(lakefs_store
188-
.storage
190+
.prefixed_registry
189191
.get_store(
190192
&Url::parse(format!("lakefs://repo/delta-tx-{operation_id}/table").as_str())
191193
.unwrap()

crates/lakefs/src/logstore.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ pub fn lakefs_logstore(
5656
/// Default [`LogStore`] implementation
5757
#[derive(Debug, Clone)]
5858
pub(crate) struct LakeFSLogStore {
59-
pub(crate) storage: DefaultObjectStoreRegistry,
59+
pub(crate) prefixed_registry: DefaultObjectStoreRegistry,
6060
root_registry: DefaultObjectStoreRegistry,
61-
pub(crate) config: LogStoreConfig,
61+
config: LogStoreConfig,
6262
pub(crate) client: LakeFSClient,
6363
}
6464

@@ -67,20 +67,23 @@ impl LakeFSLogStore {
6767
///
6868
/// # Arguments
6969
///
70-
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
70+
/// * `prefixed_store` - A shared reference to an [`object_store::ObjectStore`]
71+
/// with "/" pointing at delta table root (i.e. where `_delta_log` is located).
72+
/// * `root_store` - A shared reference to an [`object_store::ObjectStore`] with "/"
73+
/// pointing at root of the storage system.
7174
/// * `location` - A url corresponding to the storage location of `storage`.
7275
pub fn new(
73-
storage: ObjectStoreRef,
76+
prefixed_store: ObjectStoreRef,
7477
root_store: ObjectStoreRef,
7578
config: LogStoreConfig,
7679
client: LakeFSClient,
7780
) -> Self {
78-
let registry = DefaultObjectStoreRegistry::new();
79-
registry.register_store(&config.location, storage);
81+
let prefixed_registry = DefaultObjectStoreRegistry::new();
82+
prefixed_registry.register_store(&config.location, prefixed_store);
8083
let root_registry = DefaultObjectStoreRegistry::new();
8184
root_registry.register_store(&config.location, root_store);
8285
Self {
83-
storage: registry,
86+
prefixed_registry,
8487
root_registry,
8588
config,
8689
client,
@@ -112,7 +115,7 @@ impl LakeFSLogStore {
112115
}
113116

114117
fn register_object_store(&self, url: &Url, store: ObjectStoreRef) {
115-
self.storage.register_store(url, store);
118+
self.prefixed_registry.register_store(url, store);
116119
}
117120

118121
fn register_root_object_store(&self, url: &Url, store: ObjectStoreRef) {
@@ -131,7 +134,7 @@ impl LakeFSLogStore {
131134
let transaction_url = Url::parse(&string_url).unwrap();
132135
Ok((
133136
string_url,
134-
self.storage.get_store(&transaction_url)?,
137+
self.prefixed_registry.get_store(&transaction_url)?,
135138
self.root_registry.get_store(&transaction_url)?,
136139
))
137140
}
@@ -236,7 +239,11 @@ impl LogStore for LakeFSLogStore {
236239
}
237240

238241
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
239-
read_commit_entry(&self.storage.get_store(&self.config.location)?, version).await
242+
read_commit_entry(
243+
&self.prefixed_registry.get_store(&self.config.location)?,
244+
version,
245+
)
246+
.await
240247
}
241248

242249
/// Tries to commit a prepared commit file. Returns [`TransactionError`]
@@ -355,7 +362,10 @@ impl LogStore for LakeFSLogStore {
355362
let (_, store, _) = self.get_transaction_objectstore(id).unwrap_or_else(|_| panic!("The object_store registry inside LakeFSLogstore didn't have a store for operation_id {id} Something went wrong."));
356363
store
357364
}
358-
_ => self.storage.get_store(&self.config.location).unwrap(),
365+
_ => self
366+
.prefixed_registry
367+
.get_store(&self.config.location)
368+
.unwrap(),
359369
}
360370
}
361371

0 commit comments

Comments
 (0)