Skip to content

Commit 7064361

Browse files
fvaleyertyler
authored andcommitted
feat(typed-builder): migrate builder to use TypedBuilder in aws crate
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
1 parent 5ac0629 commit 7064361

6 files changed

Lines changed: 139 additions & 96 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ uuid = { version = "1" }
6969
async-trait = { version = "0.1" }
7070
futures = { version = "0.3" }
7171
tokio = { version = "1" }
72+
typed-builder = { version = "0.23.0" }
7273

7374
# opentelemetry
7475
tracing-opentelemetry = { version = "0.32" }

crates/aws/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ object_store = { workspace = true, features = ["aws"] }
2525
regex = { workspace = true }
2626
thiserror = { workspace = true }
2727
tokio = { workspace = true }
28+
typed-builder = { workspace = true }
2829
uuid = { workspace = true, features = ["serde", "v4"] }
2930
url = { workspace = true }
3031

crates/aws/src/lib.rs

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use storage::S3StorageOptionsConversion;
4444
use storage::{S3ObjectStoreFactory, S3StorageOptions};
4545
use tracing::debug;
4646
use tracing::warn;
47+
use typed_builder::TypedBuilder;
4748
use url::Url;
4849

4950
#[derive(Clone, Debug, Default)]
@@ -118,36 +119,29 @@ pub fn register_handlers(_additional_prefixes: Option<Url>) {
118119
/// - temp_path: String - name of temporary file containing commit info
119120
/// - complete: bool - operation completed, i.e. atomic rename from `tempPath` to `fileName` succeeded
120121
/// - expire_time: `Option<SystemTime>` - epoch seconds at which this external commit entry is safe to be deleted
121-
#[derive(Debug, PartialEq)]
122+
#[derive(Debug, PartialEq, TypedBuilder)]
123+
#[builder(doc)]
122124
pub struct CommitEntry {
123-
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`
125+
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`)
124126
pub version: i64,
125-
/// Path to temp file for this commit, relative to the `_delta_log
127+
/// Path to temp file for this commit, relative to the `_delta_log` directory
128+
#[builder(setter(into))]
126129
pub temp_path: Path,
127130
/// true if delta json file is successfully copied to its destination location, else false
131+
#[builder(default = false)]
128132
pub complete: bool,
129133
/// If complete = true, epoch seconds at which this external commit entry is safe to be deleted
134+
#[builder(default, setter(strip_option))]
130135
pub expire_time: Option<SystemTime>,
131136
}
132137

133-
impl CommitEntry {
134-
/// Create a new log entry for the given version.
135-
/// Initial log entry state is incomplete.
136-
pub fn new(version: i64, temp_path: Path) -> CommitEntry {
137-
Self {
138-
version,
139-
temp_path,
140-
complete: false,
141-
expire_time: None,
142-
}
143-
}
144-
}
145-
146138
/// Lock client backed by DynamoDb.
139+
#[derive(TypedBuilder)]
140+
#[builder(doc)]
147141
pub struct DynamoDbLockClient {
148142
/// DynamoDb client
149143
dynamodb_client: Client,
150-
/// configuration of the
144+
/// Configuration of the lock client
151145
config: DynamoDbConfig,
152146
}
153147

@@ -204,16 +198,16 @@ impl DynamoDbLockClient {
204198
)
205199
.map_err(|err| DynamoDbConfigError::ParseMaxElapsedRequestTime { source: err })?;
206200

207-
let config = DynamoDbConfig {
208-
billing_mode,
209-
lock_table_name,
210-
max_elapsed_request_time,
211-
sdk_config: sdk_config.clone(),
212-
};
213-
Ok(Self {
214-
dynamodb_client,
215-
config,
216-
})
201+
let config = DynamoDbConfig::builder()
202+
.billing_mode(billing_mode)
203+
.lock_table_name(lock_table_name)
204+
.max_elapsed_request_time(max_elapsed_request_time)
205+
.sdk_config(sdk_config.clone())
206+
.build();
207+
Ok(Self::builder()
208+
.dynamodb_client(dynamodb_client)
209+
.config(config)
210+
.build())
217211
}
218212
fn create_dynamodb_sdk_config(
219213
sdk_config: &SdkConfig,
@@ -612,10 +606,13 @@ impl TryFrom<&HashMap<String, AttributeValue>> for CommitEntry {
612606
})
613607
.transpose()?
614608
.map(epoch_to_system_time);
609+
let complete = extract_required_string_field(item, constants::ATTR_COMPLETE)? == "true";
610+
611+
// Construct directly to avoid TypedBuilder's type state issues with conditional fields
615612
Ok(Self {
616613
version,
617614
temp_path,
618-
complete: extract_required_string_field(item, constants::ATTR_COMPLETE)? == "true",
615+
complete,
619616
expire_time,
620617
})
621618
}
@@ -663,11 +660,18 @@ fn create_value_map(
663660
value_map
664661
}
665662

666-
#[derive(Debug)]
663+
/// Configuration for DynamoDb lock client
664+
#[derive(Debug, TypedBuilder)]
665+
#[builder(doc)]
667666
pub struct DynamoDbConfig {
667+
/// Billing mode for the DynamoDb table
668668
pub billing_mode: BillingMode,
669+
/// Name of the lock table
670+
#[builder(setter(into))]
669671
pub lock_table_name: String,
672+
/// Maximum time to wait for DynamoDB requests
670673
pub max_elapsed_request_time: Duration,
674+
/// AWS SDK configuration
671675
pub sdk_config: SdkConfig,
672676
}
673677

@@ -773,18 +777,20 @@ mod tests {
773777
.unwrap()
774778
.as_secs(),
775779
);
776-
commit_entry_roundtrip(&CommitEntry {
777-
version: 0,
778-
temp_path: Path::from("_delta_log/tmp/0_abc.json"),
779-
complete: true,
780-
expire_time: Some(system_time),
781-
})?;
782-
commit_entry_roundtrip(&CommitEntry {
783-
version: 139,
784-
temp_path: Path::from("_delta_log/tmp/0_abc.json"),
785-
complete: false,
786-
expire_time: None,
787-
})?;
780+
commit_entry_roundtrip(
781+
&CommitEntry::builder()
782+
.version(0)
783+
.temp_path(Path::from("_delta_log/tmp/0_abc.json"))
784+
.complete(true)
785+
.expire_time(system_time)
786+
.build(),
787+
)?;
788+
commit_entry_roundtrip(
789+
&CommitEntry::builder()
790+
.version(139)
791+
.temp_path(Path::from("_delta_log/tmp/0_abc.json"))
792+
.build(),
793+
)?;
788794
Ok(())
789795
}
790796

crates/aws/src/logstore/dynamodb_logstore.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult};
1010
use bytes::Bytes;
1111
use deltalake_core::{ObjectStoreError, Path};
1212
use tracing::{debug, error, warn};
13+
use typed_builder::TypedBuilder;
1314
use url::Url;
1415

1516
use deltalake_core::logstore::*;
@@ -22,11 +23,19 @@ const STORE_NAME: &str = "DeltaS3ObjectStore";
2223
const MAX_REPAIR_RETRIES: i64 = 3;
2324

2425
/// [`LogStore`] implementation backed by DynamoDb
26+
#[derive(TypedBuilder)]
27+
#[builder(doc)]
2528
pub struct S3DynamoDbLogStore {
29+
/// Object store for delta log operations
2630
prefixed_store: ObjectStoreRef,
31+
/// Root object store
2732
root_store: ObjectStoreRef,
33+
/// DynamoDB lock client for transaction coordination
2834
lock_client: DynamoDbLockClient,
35+
/// Log store configuration
2936
config: LogStoreConfig,
37+
/// Table path URI
38+
#[builder(setter(into))]
3039
table_path: String,
3140
}
3241

@@ -72,16 +81,16 @@ impl S3DynamoDbLogStore {
7281
},
7382
})?;
7483
let table_path = to_uri(&location, &Path::from(""));
75-
Ok(Self {
76-
prefixed_store,
77-
root_store,
78-
lock_client,
79-
config: LogStoreConfig {
84+
Ok(Self::builder()
85+
.prefixed_store(prefixed_store)
86+
.root_store(root_store)
87+
.lock_client(lock_client)
88+
.config(LogStoreConfig {
8089
location,
8190
options: options.clone(),
82-
},
83-
table_path,
84-
})
91+
})
92+
.table_path(table_path)
93+
.build())
8594
}
8695

8796
/// Attempt to repair an incomplete log entry by moving the temporary commit file
@@ -218,7 +227,10 @@ impl LogStore for S3DynamoDbLogStore {
218227
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
219228
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
220229
};
221-
let entry = CommitEntry::new(version, tmp_commit.clone());
230+
let entry = CommitEntry::builder()
231+
.version(version)
232+
.temp_path(tmp_commit.clone())
233+
.build();
222234
debug!("Writing commit entry for {self:?}: {entry:?}");
223235
// create log entry in dynamo db: complete = false, no expireTime
224236
self.lock_client

crates/aws/src/storage.rs

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use futures::Future;
2222
use object_store::aws::AmazonS3;
2323
use object_store::client::SpawnedReqwestConnector;
2424
use tracing::log::*;
25+
use typed_builder::TypedBuilder;
2526
use url::Url;
2627

2728
use crate::constants;
@@ -30,6 +31,11 @@ use crate::errors::DynamoDbConfigError;
3031

3132
const STORE_NAME: &str = "DeltaS3ObjectStore";
3233

34+
// Default values for S3StorageOptions
35+
const DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS: u64 = 15;
36+
const DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS: u64 = 10;
37+
const DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: usize = 10;
38+
3339
#[derive(Clone, Default, Debug)]
3440
pub struct S3ObjectStoreFactory {}
3541

@@ -122,21 +128,47 @@ fn is_aws(options: &HashMap<String, String>) -> bool {
122128
/// Options used to configure the [S3StorageBackend].
123129
///
124130
/// Available options are described in [constants].
125-
#[derive(Clone, Debug)]
126-
#[allow(missing_docs)]
131+
#[derive(Clone, Debug, TypedBuilder)]
132+
#[builder(doc)]
127133
pub struct S3StorageOptions {
134+
/// Whether to use virtual hosted-style requests
135+
#[builder(default = false)]
128136
pub virtual_hosted_style_request: bool,
137+
/// Locking provider to use (e.g., "dynamodb")
138+
#[builder(default, setter(strip_option, into))]
129139
pub locking_provider: Option<String>,
140+
/// Override endpoint for DynamoDB
141+
#[builder(default, setter(strip_option, into))]
130142
pub dynamodb_endpoint: Option<String>,
143+
/// Override region for DynamoDB
144+
#[builder(default, setter(strip_option, into))]
131145
pub dynamodb_region: Option<String>,
146+
/// Override access key ID for DynamoDB
147+
#[builder(default, setter(strip_option, into))]
132148
pub dynamodb_access_key_id: Option<String>,
149+
/// Override secret access key for DynamoDB
150+
#[builder(default, setter(strip_option, into))]
133151
pub dynamodb_secret_access_key: Option<String>,
152+
/// Override session token for DynamoDB
153+
#[builder(default, setter(strip_option, into))]
134154
pub dynamodb_session_token: Option<String>,
155+
/// Idle timeout for S3 connection pool
156+
#[builder(default = Duration::from_secs(DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS))]
135157
pub s3_pool_idle_timeout: Duration,
158+
/// Idle timeout for STS connection pool
159+
#[builder(default = Duration::from_secs(DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS))]
136160
pub sts_pool_idle_timeout: Duration,
161+
/// Number of retries for S3 internal server errors
162+
#[builder(default = DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES)]
137163
pub s3_get_internal_server_error_retries: usize,
164+
/// Allow unsafe rename operations
165+
#[builder(default = false)]
138166
pub allow_unsafe_rename: bool,
167+
/// Extra storage options not handled by other fields
168+
#[builder(default)]
139169
pub extra_opts: HashMap<String, String>,
170+
/// AWS SDK configuration
171+
#[builder(default, setter(strip_option))]
140172
pub sdk_config: Option<SdkConfig>,
141173
}
142174

@@ -177,15 +209,21 @@ impl S3StorageOptions {
177209
Self::ensure_env_var(options, constants::AWS_WEB_IDENTITY_TOKEN_FILE);
178210
Self::ensure_env_var(options, constants::AWS_ROLE_ARN);
179211
Self::ensure_env_var(options, constants::AWS_ROLE_SESSION_NAME);
180-
let s3_pool_idle_timeout =
181-
Self::u64_or_default(options, constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15);
182-
let sts_pool_idle_timeout =
183-
Self::u64_or_default(options, constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10);
212+
let s3_pool_idle_timeout = Self::u64_or_default(
213+
options,
214+
constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS,
215+
DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS,
216+
);
217+
let sts_pool_idle_timeout = Self::u64_or_default(
218+
options,
219+
constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS,
220+
DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS,
221+
);
184222

185223
let s3_get_internal_server_error_retries = Self::u64_or_default(
186224
options,
187225
constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES,
188-
10,
226+
DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES as u64,
189227
) as usize;
190228

191229
let virtual_hosted_style_request: bool =
@@ -568,26 +606,15 @@ mod tests {
568606

569607
let options = S3StorageOptions::try_default().unwrap();
570608
assert_eq!(
571-
S3StorageOptions {
572-
sdk_config: Some(
609+
S3StorageOptions::builder()
610+
.sdk_config(
573611
SdkConfig::builder()
574612
.endpoint_url("http://localhost".to_string())
575613
.region(Region::from_static("us-west-1"))
576614
.build()
577-
),
578-
virtual_hosted_style_request: false,
579-
locking_provider: Some("dynamodb".to_string()),
580-
dynamodb_endpoint: None,
581-
dynamodb_region: None,
582-
dynamodb_access_key_id: None,
583-
dynamodb_secret_access_key: None,
584-
dynamodb_session_token: None,
585-
s3_pool_idle_timeout: Duration::from_secs(15),
586-
sts_pool_idle_timeout: Duration::from_secs(10),
587-
s3_get_internal_server_error_retries: 10,
588-
extra_opts: HashMap::new(),
589-
allow_unsafe_rename: false,
590-
},
615+
)
616+
.locking_provider("dynamodb")
617+
.build(),
591618
options
592619
);
593620
});
@@ -803,26 +830,19 @@ mod tests {
803830
.unwrap();
804831

805832
assert_eq!(
806-
S3StorageOptions {
807-
sdk_config: Some(
833+
S3StorageOptions::builder()
834+
.sdk_config(
808835
SdkConfig::builder()
809836
.endpoint_url("http://localhost".to_string())
810837
.region(Region::from_static("us-west-2"))
811838
.build()
812-
),
813-
virtual_hosted_style_request: false,
814-
locking_provider: Some("dynamodb".to_string()),
815-
dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()),
816-
dynamodb_region: None,
817-
dynamodb_access_key_id: None,
818-
dynamodb_secret_access_key: None,
819-
dynamodb_session_token: None,
820-
s3_pool_idle_timeout: Duration::from_secs(1),
821-
sts_pool_idle_timeout: Duration::from_secs(2),
822-
s3_get_internal_server_error_retries: 3,
823-
extra_opts: HashMap::new(),
824-
allow_unsafe_rename: false,
825-
},
839+
)
840+
.locking_provider("dynamodb")
841+
.dynamodb_endpoint("http://localhost:dynamodb")
842+
.s3_pool_idle_timeout(Duration::from_secs(1))
843+
.sts_pool_idle_timeout(Duration::from_secs(2))
844+
.s3_get_internal_server_error_retries(3)
845+
.build(),
826846
options
827847
);
828848
});

0 commit comments

Comments
 (0)