Skip to content

Commit 80fe933

Browse files
committed
feat(cli): support external tables on multiple locations
1 parent 5bbdb7e commit 80fe933

11 files changed

Lines changed: 285 additions & 166 deletions

File tree

datafusion-cli/src/exec.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
3939
use datafusion::sql::parser::{DFParser, Statement};
4040
use datafusion::sql::sqlparser;
4141
use datafusion::sql::sqlparser::dialect::dialect_from_str;
42+
use futures::future::join_all;
4243
use futures::StreamExt;
4344
use log::warn;
4445
use object_store::Error::Generic;
@@ -415,14 +416,18 @@ async fn create_plan(
415416
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
416417
// To support custom formats, treat error as None
417418
let format = config_file_type_from_str(&cmd.file_type);
418-
register_object_store_and_config_extensions(
419-
ctx,
420-
&cmd.location,
421-
&cmd.options,
422-
format,
423-
resolve_region,
424-
)
425-
.await?;
419+
let register_futures = cmd.locations.iter().map(|location| {
420+
register_object_store_and_config_extensions(
421+
ctx,
422+
location,
423+
&cmd.options,
424+
format.clone(),
425+
resolve_region,
426+
)
427+
});
428+
for result in join_all(register_futures).await {
429+
result?;
430+
}
426431
}
427432

428433
if let LogicalPlan::Copy(copy_to) = &mut plan {
@@ -524,14 +529,16 @@ mod tests {
524529

525530
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
526531
let format = config_file_type_from_str(&cmd.file_type);
527-
register_object_store_and_config_extensions(
528-
&ctx,
529-
&cmd.location,
530-
&cmd.options,
531-
format,
532-
false,
533-
)
534-
.await?;
532+
for location in &cmd.locations {
533+
register_object_store_and_config_extensions(
534+
&ctx,
535+
&location,
536+
&cmd.options,
537+
format.clone(),
538+
false,
539+
)
540+
.await?;
541+
}
535542
} else {
536543
return plan_err!("LogicalPlan is not a CreateExternalTable");
537544
}

datafusion/catalog/src/listing_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl ListingSchemaProvider {
132132
&CreateExternalTable {
133133
schema: Arc::new(DFSchema::empty()),
134134
name,
135-
location: table_url,
135+
locations: vec![table_url],
136136
file_type: self.format.clone(),
137137
table_partition_cols: vec![],
138138
if_not_exists: false,

datafusion/catalog/src/stream.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,12 @@ impl TableProviderFactory for StreamTableFactory {
5353
state: &dyn Session,
5454
cmd: &CreateExternalTable,
5555
) -> Result<Arc<dyn TableProvider>> {
56+
let location = match cmd.locations.len() {
57+
1 => &cmd.locations[0],
58+
_ => return config_err!("Stream table factory supports only a single table location"),
59+
};
60+
5661
let schema: SchemaRef = Arc::clone(cmd.schema.inner());
57-
let location = cmd.location.clone();
5862
let encoding = cmd.file_type.parse()?;
5963
let header = if let Ok(opt) = cmd
6064
.options

datafusion/core/src/datasource/listing/table.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,12 +1735,12 @@ mod tests {
17351735
(
17361736
vec![vec![col("string_col").sort(true, false)]],
17371737
Ok(vec![[PhysicalSortExpr {
1738-
expr: physical_col("string_col", &schema).unwrap(),
1739-
options: SortOptions {
1740-
descending: false,
1741-
nulls_first: false,
1742-
},
1743-
}].into(),
1738+
expr: physical_col("string_col", &schema).unwrap(),
1739+
options: SortOptions {
1740+
descending: false,
1741+
nulls_first: false,
1742+
},
1743+
}].into(),
17441744
])
17451745
),
17461746
// ok with two columns, different options
@@ -1751,11 +1751,11 @@ mod tests {
17511751
]],
17521752
Ok(vec![[
17531753
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
1754-
.asc()
1755-
.nulls_last(),
1754+
.asc()
1755+
.nulls_last(),
17561756
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
1757-
.desc()
1758-
.nulls_first()
1757+
.desc()
1758+
.nulls_first()
17591759
].into(),
17601760
])
17611761
),

0 commit comments

Comments
 (0)