Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 102 additions & 4 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ use datafusion_common::{
};
use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX};
use datafusion_expr::{
BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
Aggregate, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
UserDefinedLogicalNode, expr::Alias,
UserDefinedLogicalNode, Window, expr::Alias,
};
use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
use std::{sync::Arc, vec};
Expand Down Expand Up @@ -477,6 +477,80 @@ impl Unparser<'_> {
Ok(false)
}

fn project_window_output(
&self,
window_expr: &[Expr],
select: &mut SelectBuilder,
agg: Option<&Aggregate>,
) -> Result<()> {
let mut items = if select.already_projected() {
select.pop_projections()
} else {
vec![ast::SelectItem::Wildcard(
ast::WildcardAdditionalOptions::default(),
)]
};

items.extend(
window_expr
.iter()
.map(|expr| {
let expr = if let Some(agg) = agg {
unproject_agg_exprs(expr.clone(), agg, None)?
} else {
expr.clone()
};
self.select_item_to_sql(&expr)
})
.collect::<Result<Vec<_>>>()?,
);
select.projection(items);

Ok(())
}

fn window_input_requires_derived_subquery(plan: &LogicalPlan) -> bool {
// These operators either produce a SELECT list or apply SQL clauses
// that are evaluated after window functions in a single SELECT block.
// Keep them below the Window node by emitting a derived table.
matches!(
plan,
LogicalPlan::Projection(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Sort(_)
| LogicalPlan::Union(_)
)
}

fn window_to_sql_with_derived_input(
&self,
window: &Window,
select: &mut SelectBuilder,
relation: &mut RelationBuilder,
) -> Result<()> {
let input_alias = "derived_window_input";
self.derive(
window.input.as_ref(),
relation,
Some(self.new_table_alias(input_alias.to_string(), vec![])),
false,
)?;

let input_schema = window.input.schema();
let mut alias_rewriter = TableAliasRewriter {
table_schema: input_schema.as_arrow(),
alias_name: TableReference::bare(input_alias),
};
let window_expr = window
.window_expr
.iter()
.map(|expr| expr.clone().rewrite(&mut alias_rewriter).data())
.collect::<Result<Vec<_>>>()?;

self.project_window_output(&window_expr, select, None)
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn select_to_sql_recursively(
&self,
Expand Down Expand Up @@ -1159,13 +1233,37 @@ impl Unparser<'_> {
Ok(())
}
LogicalPlan::Window(window) => {
// Window nodes are handled simultaneously with Projection nodes
// Window nodes are usually handled simultaneously with Projection
// nodes, where projected columns are unprojected back into their
// corresponding window expressions. Manually built plans can have
// Window nodes without an enclosing Projection, so in that case
// the Window node itself must contribute its output expressions.
let project_window_output = !select.already_projected();
if project_window_output
&& Self::window_input_requires_derived_subquery(window.input.as_ref())
{
return self
.window_to_sql_with_derived_input(window, select, relation);
}

let agg = if project_window_output {
find_agg_node_within_select(plan, false)
} else {
None
};

self.select_to_sql_recursively(
window.input.as_ref(),
query,
select,
relation,
)
)?;

if project_window_output {
self.project_window_output(&window.window_expr, select, agg)?;
}

Ok(())
}
LogicalPlan::EmptyRelation(_) => {
// An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
Expand Down
7 changes: 6 additions & 1 deletion datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ pub(crate) fn find_agg_node_within_select(
// Agg nodes explicitly return immediately with a single node
if let LogicalPlan::Aggregate(agg) = input {
Some(agg)
} else if let LogicalPlan::TableScan(_) = input {
} else if matches!(
input,
LogicalPlan::TableScan(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
) {
None
} else if let LogicalPlan::Projection(_) = input {
if already_projected {
Expand Down
Loading
Loading