Skip to content

Commit d93f799

Browse files
ntjohnson1de-bgunter
authored andcommitted
Allow dropping qualified columns (apache#19549)
## Which issue does this PR close? - Closes apache#19548 ## Rationale for this change Explanation in the issue. Motivation coming more concretely from datafusion-python apache/datafusion-python#1305 (comment) ## What changes are included in this PR? * Adds the test from the issue to highlight expected behavior * Expands drop_columns to coerce things into a fully qualified column to support the range of column varieties * This further adds a helper to extract the table name associated with the dataframe to simplify use of qualified drop columns support * This is potentially the most controversial part. I could see a nicer api being `df.col(<name>)` to match the expr version but then we probably do repeated checks for the underlying table name unless there is some caching somewhere. Maybe that performance impact isn't significant. ## Are these changes tested? Yes some additional tests are provided. ## Are there any user-facing changes? I had to update the `drop_columns(&[])` test since the type can no longer be inferred. I'm not sure if that is representative of any actual use cases though since I expect the more common is a vector that might be empty in which case the type would be specified. It now requires specifying columns with dots in them similar to other places `"\"f.col1\""` to disambiguate from `"f.col1"`.
1 parent a4f3b2d commit d93f799

3 files changed

Lines changed: 169 additions & 9 deletions

File tree

datafusion-cli/src/functions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ impl TableFunctionImpl for ParquetMetadataFunc {
426426
compression_arr.push(format!("{:?}", column.compression()));
427427
// need to collect into Vec to format
428428
let encodings: Vec<_> = column.encodings().collect();
429-
encodings_arr.push(format!("{:?}", encodings));
429+
encodings_arr.push(format!("{encodings:?}"));
430430
index_page_offset_arr.push(column.index_page_offset());
431431
dictionary_page_offset_arr.push(column.dictionary_page_offset());
432432
data_page_offset_arr.push(column.data_page_offset());

datafusion/core/src/dataframe/mod.rs

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -447,15 +447,31 @@ impl DataFrame {
447447
/// # Ok(())
448448
/// # }
449449
/// ```
450-
pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
450+
pub fn drop_columns<T>(self, columns: &[T]) -> Result<DataFrame>
451+
where
452+
T: Into<Column> + Clone,
453+
{
451454
let fields_to_drop = columns
452455
.iter()
453-
.flat_map(|name| {
454-
self.plan
455-
.schema()
456-
.qualified_fields_with_unqualified_name(name)
456+
.flat_map(|col| {
457+
let column: Column = col.clone().into();
458+
match column.relation.as_ref() {
459+
Some(_) => {
460+
// qualified_field_from_column returns Result<(Option<&TableReference>, &FieldRef)>
461+
vec![self.plan.schema().qualified_field_from_column(&column)]
462+
}
463+
None => {
464+
// qualified_fields_with_unqualified_name returns Vec<(Option<&TableReference>, &FieldRef)>
465+
self.plan
466+
.schema()
467+
.qualified_fields_with_unqualified_name(&column.name)
468+
.into_iter()
469+
.map(Ok)
470+
.collect::<Vec<_>>()
471+
}
472+
}
457473
})
458-
.collect::<Vec<_>>();
474+
.collect::<Result<Vec<_>, _>>()?;
459475
let expr: Vec<Expr> = self
460476
.plan
461477
.schema()
@@ -2465,6 +2481,48 @@ impl DataFrame {
24652481
.collect()
24662482
}
24672483

2484+
/// Find qualified columns for this dataframe from names
2485+
///
2486+
/// # Arguments
2487+
/// * `names` - Unqualified names to find.
2488+
///
2489+
/// # Example
2490+
/// ```
2491+
/// # use datafusion::prelude::*;
2492+
/// # use datafusion::error::Result;
2493+
/// # use datafusion_common::ScalarValue;
2494+
/// # #[tokio::main]
2495+
/// # async fn main() -> Result<()> {
2496+
/// let ctx = SessionContext::new();
2497+
/// ctx.register_csv("first_table", "tests/data/example.csv", CsvReadOptions::new())
2498+
/// .await?;
2499+
/// let df = ctx.table("first_table").await?;
2500+
/// ctx.register_csv("second_table", "tests/data/example.csv", CsvReadOptions::new())
2501+
/// .await?;
2502+
/// let df2 = ctx.table("second_table").await?;
2503+
/// let join_expr = df.find_qualified_columns(&["a"])?.iter()
2504+
/// .zip(df2.find_qualified_columns(&["a"])?.iter())
2505+
/// .map(|(col1, col2)| col(*col1).eq(col(*col2)))
2506+
/// .collect::<Vec<Expr>>();
2507+
/// let df3 = df.join_on(df2, JoinType::Inner, join_expr)?;
2508+
/// # Ok(())
2509+
/// # }
2510+
/// ```
2511+
pub fn find_qualified_columns(
2512+
&self,
2513+
names: &[&str],
2514+
) -> Result<Vec<(Option<&TableReference>, &FieldRef)>> {
2515+
let schema = self.logical_plan().schema();
2516+
names
2517+
.iter()
2518+
.map(|name| {
2519+
schema
2520+
.qualified_field_from_column(&Column::from_name(*name))
2521+
.map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2522+
})
2523+
.collect()
2524+
}
2525+
24682526
/// Helper for creating DataFrame.
24692527
/// # Example
24702528
/// ```

datafusion/core/tests/dataframe/mod.rs

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,8 @@ async fn drop_columns_with_nonexistent_columns() -> Result<()> {
534534
async fn drop_columns_with_empty_array() -> Result<()> {
535535
// build plan using Table API
536536
let t = test_table().await?;
537-
let t2 = t.drop_columns(&[])?;
537+
let drop_columns = vec![] as Vec<&str>;
538+
let t2 = t.drop_columns(&drop_columns)?;
538539
let plan = t2.logical_plan().clone();
539540

540541
// build query using SQL
@@ -549,6 +550,107 @@ async fn drop_columns_with_empty_array() -> Result<()> {
549550
Ok(())
550551
}
551552

553+
#[tokio::test]
554+
async fn drop_columns_qualified() -> Result<()> {
555+
// build plan using Table API
556+
let mut t = test_table().await?;
557+
t = t.select_columns(&["c1", "c2", "c11"])?;
558+
let mut t2 = test_table_with_name("another_table").await?;
559+
t2 = t2.select_columns(&["c1", "c2", "c11"])?;
560+
let mut t3 = t.join_on(
561+
t2,
562+
JoinType::Inner,
563+
[col("aggregate_test_100.c1").eq(col("another_table.c1"))],
564+
)?;
565+
t3 = t3.drop_columns(&["another_table.c2", "another_table.c11"])?;
566+
567+
let plan = t3.logical_plan().clone();
568+
569+
let sql = "SELECT aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c11, another_table.c1 FROM (SELECT c1, c2, c11 FROM aggregate_test_100) INNER JOIN (SELECT c1, c2, c11 FROM another_table) ON aggregate_test_100.c1 = another_table.c1";
570+
let ctx = SessionContext::new();
571+
register_aggregate_csv(&ctx, "aggregate_test_100").await?;
572+
register_aggregate_csv(&ctx, "another_table").await?;
573+
let sql_plan = ctx.sql(sql).await?.into_unoptimized_plan();
574+
575+
// the two plans should be identical
576+
assert_same_plan(&plan, &sql_plan);
577+
578+
Ok(())
579+
}
580+
581+
#[tokio::test]
582+
async fn drop_columns_qualified_find_qualified() -> Result<()> {
583+
// build plan using Table API
584+
let mut t = test_table().await?;
585+
t = t.select_columns(&["c1", "c2", "c11"])?;
586+
let mut t2 = test_table_with_name("another_table").await?;
587+
t2 = t2.select_columns(&["c1", "c2", "c11"])?;
588+
let mut t3 = t.join_on(
589+
t2.clone(),
590+
JoinType::Inner,
591+
[col("aggregate_test_100.c1").eq(col("another_table.c1"))],
592+
)?;
593+
t3 = t3.drop_columns(&t2.find_qualified_columns(&["c2", "c11"])?)?;
594+
595+
let plan = t3.logical_plan().clone();
596+
597+
let sql = "SELECT aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c11, another_table.c1 FROM (SELECT c1, c2, c11 FROM aggregate_test_100) INNER JOIN (SELECT c1, c2, c11 FROM another_table) ON aggregate_test_100.c1 = another_table.c1";
598+
let ctx = SessionContext::new();
599+
register_aggregate_csv(&ctx, "aggregate_test_100").await?;
600+
register_aggregate_csv(&ctx, "another_table").await?;
601+
let sql_plan = ctx.sql(sql).await?.into_unoptimized_plan();
602+
603+
// the two plans should be identical
604+
assert_same_plan(&plan, &sql_plan);
605+
606+
Ok(())
607+
}
608+
609+
#[tokio::test]
610+
async fn test_find_qualified_names() -> Result<()> {
611+
let t = test_table().await?;
612+
let column_names = ["c1", "c2", "c3"];
613+
let columns = t.find_qualified_columns(&column_names)?;
614+
615+
// Expected results for each column
616+
let binding = TableReference::bare("aggregate_test_100");
617+
let expected = [
618+
(Some(&binding), "c1"),
619+
(Some(&binding), "c2"),
620+
(Some(&binding), "c3"),
621+
];
622+
623+
// Verify we got the expected number of results
624+
assert_eq!(
625+
columns.len(),
626+
expected.len(),
627+
"Expected {} columns, got {}",
628+
expected.len(),
629+
columns.len()
630+
);
631+
632+
// Iterate over the results and check each one individually
633+
for (i, (actual, expected)) in columns.iter().zip(expected.iter()).enumerate() {
634+
let (actual_table_ref, actual_field_ref) = actual;
635+
let (expected_table_ref, expected_field_name) = expected;
636+
637+
// Check table reference
638+
assert_eq!(
639+
actual_table_ref, expected_table_ref,
640+
"Column {i}: expected table reference {expected_table_ref:?}, got {actual_table_ref:?}"
641+
);
642+
643+
// Check field name
644+
assert_eq!(
645+
actual_field_ref.name(),
646+
*expected_field_name,
647+
"Column {i}: expected field name '{expected_field_name}', got '{actual_field_ref}'"
648+
);
649+
}
650+
651+
Ok(())
652+
}
653+
552654
#[tokio::test]
553655
async fn drop_with_quotes() -> Result<()> {
554656
// define data with a column name that has a "." in it:
@@ -594,7 +696,7 @@ async fn drop_with_periods() -> Result<()> {
594696
let ctx = SessionContext::new();
595697
ctx.register_batch("t", batch)?;
596698

597-
let df = ctx.table("t").await?.drop_columns(&["f.c1"])?;
699+
let df = ctx.table("t").await?.drop_columns(&["\"f.c1\""])?;
598700

599701
let df_results = df.collect().await?;
600702

0 commit comments

Comments
 (0)