Skip to content

Commit fe1b574

Browse files
authored
Extract Common Listing and Retrieval Functionality (#4220)
* Factor out common cloud storage client functionality * Remove format_prefix * Review feedback
1 parent f566903 commit fe1b574

10 files changed

Lines changed: 497 additions & 440 deletions

File tree

object_store/src/aws/client.rs

Lines changed: 103 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818
use crate::aws::checksum::Checksum;
1919
use crate::aws::credential::{AwsCredential, CredentialExt};
2020
use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
21-
use crate::client::list::ListResponse;
22-
use crate::client::pagination::stream_paginated;
21+
use crate::client::get::GetClient;
22+
use crate::client::list::ListClient;
23+
use crate::client::list_response::ListResponse;
2324
use crate::client::retry::RetryExt;
2425
use crate::client::GetOptionsExt;
2526
use crate::multipart::UploadPart;
2627
use crate::path::DELIMITER;
27-
use crate::util::format_prefix;
2828
use crate::{
29-
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result,
30-
RetryConfig, StreamExt,
29+
ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig,
3130
};
31+
use async_trait::async_trait;
3232
use base64::prelude::BASE64_STANDARD;
3333
use base64::Engine;
3434
use bytes::{Buf, Bytes};
@@ -169,40 +169,6 @@ impl S3Client {
169169
self.config.credentials.get_credential().await
170170
}
171171

172-
/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
173-
pub async fn get_request(
174-
&self,
175-
path: &Path,
176-
options: GetOptions,
177-
head: bool,
178-
) -> Result<Response> {
179-
let credential = self.get_credential().await?;
180-
let url = self.config.path_url(path);
181-
let method = match head {
182-
true => Method::HEAD,
183-
false => Method::GET,
184-
};
185-
186-
let builder = self.client.request(method, url);
187-
188-
let response = builder
189-
.with_get_options(options)
190-
.with_aws_sigv4(
191-
credential.as_ref(),
192-
&self.config.region,
193-
"s3",
194-
self.config.sign_payload,
195-
None,
196-
)
197-
.send_retry(&self.config.retry_config)
198-
.await
199-
.context(GetRequestSnafu {
200-
path: path.as_ref(),
201-
})?;
202-
203-
Ok(response)
204-
}
205-
206172
/// Make an S3 PUT request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
207173
pub async fn put_request<T: Serialize + ?Sized + Sync>(
208174
&self,
@@ -302,88 +268,6 @@ impl S3Client {
302268
Ok(())
303269
}
304270

305-
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
306-
async fn list_request(
307-
&self,
308-
prefix: Option<&str>,
309-
delimiter: bool,
310-
token: Option<&str>,
311-
offset: Option<&str>,
312-
) -> Result<(ListResult, Option<String>)> {
313-
let credential = self.get_credential().await?;
314-
let url = self.config.bucket_endpoint.clone();
315-
316-
let mut query = Vec::with_capacity(4);
317-
318-
if let Some(token) = token {
319-
query.push(("continuation-token", token))
320-
}
321-
322-
if delimiter {
323-
query.push(("delimiter", DELIMITER))
324-
}
325-
326-
query.push(("list-type", "2"));
327-
328-
if let Some(prefix) = prefix {
329-
query.push(("prefix", prefix))
330-
}
331-
332-
if let Some(offset) = offset {
333-
query.push(("start-after", offset))
334-
}
335-
336-
let response = self
337-
.client
338-
.request(Method::GET, &url)
339-
.query(&query)
340-
.with_aws_sigv4(
341-
credential.as_ref(),
342-
&self.config.region,
343-
"s3",
344-
self.config.sign_payload,
345-
None,
346-
)
347-
.send_retry(&self.config.retry_config)
348-
.await
349-
.context(ListRequestSnafu)?
350-
.bytes()
351-
.await
352-
.context(ListResponseBodySnafu)?;
353-
354-
let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
355-
.context(InvalidListResponseSnafu)?;
356-
let token = response.next_continuation_token.take();
357-
358-
Ok((response.try_into()?, token))
359-
}
360-
361-
/// Perform a list operation automatically handling pagination
362-
pub fn list_paginated(
363-
&self,
364-
prefix: Option<&Path>,
365-
delimiter: bool,
366-
offset: Option<&Path>,
367-
) -> BoxStream<'_, Result<ListResult>> {
368-
let offset = offset.map(|x| x.to_string());
369-
let prefix = format_prefix(prefix);
370-
stream_paginated(
371-
(prefix, offset),
372-
move |(prefix, offset), token| async move {
373-
let (r, next_token) = self
374-
.list_request(
375-
prefix.as_deref(),
376-
delimiter,
377-
token.as_deref(),
378-
offset.as_deref(),
379-
)
380-
.await?;
381-
Ok((r, (prefix, offset), next_token))
382-
},
383-
)
384-
.boxed()
385-
}
386-
387271
pub async fn create_multipart(&self, location: &Path) -> Result<MultipartId> {
388272
let credential = self.get_credential().await?;
389273
let url = format!("{}?uploads=", self.config.path_url(location),);
@@ -451,6 +335,104 @@ impl S3Client {
451335
}
452336
}
453337

338+
#[async_trait]
339+
impl GetClient for S3Client {
340+
const STORE: &'static str = STORE;
341+
342+
/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
343+
async fn get_request(
344+
&self,
345+
path: &Path,
346+
options: GetOptions,
347+
head: bool,
348+
) -> Result<Response> {
349+
let credential = self.get_credential().await?;
350+
let url = self.config.path_url(path);
351+
let method = match head {
352+
true => Method::HEAD,
353+
false => Method::GET,
354+
};
355+
356+
let builder = self.client.request(method, url);
357+
358+
let response = builder
359+
.with_get_options(options)
360+
.with_aws_sigv4(
361+
credential.as_ref(),
362+
&self.config.region,
363+
"s3",
364+
self.config.sign_payload,
365+
None,
366+
)
367+
.send_retry(&self.config.retry_config)
368+
.await
369+
.context(GetRequestSnafu {
370+
path: path.as_ref(),
371+
})?;
372+
373+
Ok(response)
374+
}
375+
}
376+
377+
#[async_trait]
378+
impl ListClient for S3Client {
379+
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
380+
async fn list_request(
381+
&self,
382+
prefix: Option<&str>,
383+
delimiter: bool,
384+
token: Option<&str>,
385+
offset: Option<&str>,
386+
) -> Result<(ListResult, Option<String>)> {
387+
let credential = self.get_credential().await?;
388+
let url = self.config.bucket_endpoint.clone();
389+
390+
let mut query = Vec::with_capacity(4);
391+
392+
if let Some(token) = token {
393+
query.push(("continuation-token", token))
394+
}
395+
396+
if delimiter {
397+
query.push(("delimiter", DELIMITER))
398+
}
399+
400+
query.push(("list-type", "2"));
401+
402+
if let Some(prefix) = prefix {
403+
query.push(("prefix", prefix))
404+
}
405+
406+
if let Some(offset) = offset {
407+
query.push(("start-after", offset))
408+
}
409+
410+
let response = self
411+
.client
412+
.request(Method::GET, &url)
413+
.query(&query)
414+
.with_aws_sigv4(
415+
credential.as_ref(),
416+
&self.config.region,
417+
"s3",
418+
self.config.sign_payload,
419+
None,
420+
)
421+
.send_retry(&self.config.retry_config)
422+
.await
423+
.context(ListRequestSnafu)?
424+
.bytes()
425+
.await
426+
.context(ListResponseBodySnafu)?;
427+
428+
let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
429+
.context(InvalidListResponseSnafu)?;
430+
let token = response.next_continuation_token.take();
431+
432+
Ok((response.try_into()?, token))
433+
}
434+
}
435+
454436
fn encode_path(path: &Path) -> PercentEncode<'_> {
455437
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
456438
}

object_store/src/aws/mod.rs

Lines changed: 8 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,9 @@
3434
use async_trait::async_trait;
3535
use bytes::Bytes;
3636
use futures::stream::BoxStream;
37-
use futures::TryStreamExt;
3837
use itertools::Itertools;
3938
use serde::{Deserialize, Serialize};
4039
use snafu::{ensure, OptionExt, ResultExt, Snafu};
41-
use std::collections::BTreeSet;
4240
use std::str::FromStr;
4341
use std::sync::Arc;
4442
use tokio::io::AsyncWrite;
@@ -48,7 +46,8 @@ use url::Url;
4846
pub use crate::aws::checksum::Checksum;
4947
use crate::aws::client::{S3Client, S3Config};
5048
use crate::aws::credential::{InstanceCredentialProvider, WebIdentityProvider};
51-
use crate::client::header::header_meta;
49+
use crate::client::get::GetClientExt;
50+
use crate::client::list::ListClientExt;
5251
use crate::client::{
5352
ClientConfigKey, CredentialProvider, StaticCredentialProvider,
5453
TokenCredentialProvider,
@@ -57,7 +56,7 @@ use crate::config::ConfigValue;
5756
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
5857
use crate::{
5958
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
60-
ObjectStore, Path, Result, RetryConfig, StreamExt,
59+
ObjectStore, Path, Result, RetryConfig,
6160
};
6261

6362
mod checksum;
@@ -138,11 +137,6 @@ enum Error {
138137

139138
#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
140139
RegionParse { bucket: String },
141-
142-
#[snafu(display("Failed to parse headers: {}", source))]
143-
Header {
144-
source: crate::client::header::Error,
145-
},
146140
}
147141

148142
impl From<Error> for super::Error {
@@ -244,24 +238,11 @@ impl ObjectStore for AmazonS3 {
244238
}
245239

246240
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
247-
let response = self.client.get_request(location, options, false).await?;
248-
let stream = response
249-
.bytes_stream()
250-
.map_err(|source| crate::Error::Generic {
251-
store: STORE,
252-
source: Box::new(source),
253-
})
254-
.boxed();
255-
256-
Ok(GetResult::Stream(stream))
241+
self.client.get_opts(location, options).await
257242
}
258243

259244
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
260-
let options = GetOptions::default();
261-
// Extract meta from headers
262-
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
263-
let response = self.client.get_request(location, options, true).await?;
264-
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
245+
self.client.head(location).await
265246
}
266247

267248
async fn delete(&self, location: &Path) -> Result<()> {
@@ -272,47 +253,19 @@ impl ObjectStore for AmazonS3 {
272253
&self,
273254
prefix: Option<&Path>,
274255
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
275-
let stream = self
276-
.client
277-
.list_paginated(prefix, false, None)
278-
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
279-
.try_flatten()
280-
.boxed();
281-
282-
Ok(stream)
256+
self.client.list(prefix).await
283257
}
284258

285259
async fn list_with_offset(
286260
&self,
287261
prefix: Option<&Path>,
288262
offset: &Path,
289263
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
290-
let stream = self
291-
.client
292-
.list_paginated(prefix, false, Some(offset))
293-
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
294-
.try_flatten()
295-
.boxed();
296-
297-
Ok(stream)
264+
self.client.list_with_offset(prefix, offset).await
298265
}
299266

300267
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
301-
let mut stream = self.client.list_paginated(prefix, true, None);
302-
303-
let mut common_prefixes = BTreeSet::new();
304-
let mut objects = Vec::new();
305-
306-
while let Some(result) = stream.next().await {
307-
let response = result?;
308-
common_prefixes.extend(response.common_prefixes.into_iter());
309-
objects.extend(response.objects.into_iter());
310-
}
311-
312-
Ok(ListResult {
313-
common_prefixes: common_prefixes.into_iter().collect(),
314-
objects,
315-
})
268+
self.client.list_with_delimiter(prefix).await
316269
}
317270

318271
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {

0 commit comments

Comments
 (0)