Skip to content

Commit 3d4b807

Browse files
adriangbclaude
andcommitted
fix: rebind RecursiveQueryExec batches to the declared output schema
When a recursive CTE's anchor term aliases a computed column (e.g. `upper(val) AS val`) and the recursive term leaves the same expression un-aliased (`upper(r.val)`), `RecursiveQueryExec` declared its output schema from the anchor but forwarded batches from both branches with their native schemas intact. Downstream consumers that key on `batch.schema().field(i).name()` — TopK (ORDER BY + LIMIT), CSV/JSON writers, custom collectors — then observed the recursive branch's leaked field name instead of the anchor's. Rebind each emitted batch to the declared output schema in `RecursiveQueryStream::push_batch`. Logical-plan coercion in `LogicalPlanBuilder::to_recursive_query` already guarantees matching column types, so this is a zero-copy field rebind. Regression coverage: - Rust test in `datafusion/core/tests/sql/select.rs` asserts every collected `RecordBatch` carries the anchor's field names. - sqllogictest in `cte.slt` round-trips the result through a headered CSV file (whose header row is written from each batch's own schema) and re-reads it to surface the leaked name inside SLT. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cfafce4 commit 3d4b807

3 files changed

Lines changed: 153 additions & 0 deletions

File tree

datafusion/core/tests/sql/select.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,3 +431,65 @@ async fn test_select_cast_date_literal_to_timestamp_overflow() -> Result<()> {
431431
);
432432
Ok(())
433433
}
434+
435+
// Regression test: a recursive CTE whose anchor aliases a computed column
436+
// (`upper(val) AS val`) and whose recursive term leaves the same expression
437+
// un-aliased must still produce batches whose schema field names come from
438+
// the anchor term — especially when the outer query uses ORDER BY + LIMIT
439+
// (the TopK path passes batch schemas through verbatim, so any drift in
440+
// RecursiveQueryExec's emitted batches is visible downstream).
441+
#[tokio::test]
442+
async fn test_recursive_cte_batch_schema_stable_with_order_by_limit() -> Result<()> {
443+
let ctx = SessionContext::new();
444+
ctx.sql(
445+
"CREATE TABLE records (\
446+
id VARCHAR NOT NULL, \
447+
parent_id VARCHAR, \
448+
ts TIMESTAMP NOT NULL, \
449+
val VARCHAR\
450+
)",
451+
)
452+
.await?
453+
.collect()
454+
.await?;
455+
ctx.sql(
456+
"INSERT INTO records VALUES \
457+
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), \
458+
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), \
459+
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), \
460+
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'), \
461+
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'), \
462+
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5')",
463+
)
464+
.await?
465+
.collect()
466+
.await?;
467+
468+
let results = ctx
469+
.sql(
470+
"WITH RECURSIVE descendants AS (\
471+
SELECT id, parent_id, ts, upper(val) AS val \
472+
FROM records WHERE id = 'a00' \
473+
UNION ALL \
474+
SELECT r.id, r.parent_id, r.ts, upper(r.val) \
475+
FROM records r INNER JOIN descendants d ON r.parent_id = d.id \
476+
) \
477+
SELECT id, parent_id, ts, val FROM descendants ORDER BY ts ASC LIMIT 10",
478+
)
479+
.await?
480+
.collect()
481+
.await?;
482+
483+
let expected_names = ["id", "parent_id", "ts", "val"];
484+
assert!(!results.is_empty(), "expected at least one batch");
485+
for (i, batch) in results.iter().enumerate() {
486+
let schema = batch.schema();
487+
let actual_names: Vec<&str> =
488+
schema.fields().iter().map(|f| f.name().as_str()).collect();
489+
assert_eq!(
490+
actual_names, expected_names,
491+
"batch {i} schema field names leaked from recursive branch"
492+
);
493+
}
494+
Ok(())
495+
}

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,20 @@ impl RecursiveQueryStream {
317317
mut batch: RecordBatch,
318318
) -> Poll<Option<Result<RecordBatch>>> {
319319
let baseline_metrics = self.baseline_metrics.clone();
320+
321+
// Rebind to the declared output schema. The recursive term is planned
322+
// independently from the static term and its projection may leave
323+
// columns un-aliased (e.g. `upper(r.val)` vs the anchor's
324+
// `upper(val) AS val`); downstream consumers that key on
325+
// `batch.schema().field(i).name()` (TopK, CSV/JSON writers, custom
326+
// collectors) would otherwise see the recursive branch's names leak
327+
// through. Logical-plan coercion guarantees matching types, so this
328+
// is a zero-copy field rebind.
329+
if batch.schema() != self.schema {
330+
batch =
331+
RecordBatch::try_new(Arc::clone(&self.schema), batch.columns().to_vec())?;
332+
}
333+
320334
if let Some(deduplicator) = &mut self.distinct_deduplicator {
321335
let _timer_guard = baseline_metrics.elapsed_compute().timer();
322336
batch = deduplicator.deduplicate(&batch)?;

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,83 @@ physical_plan
12231223
07)------RepartitionExec: partitioning=Hash([start@0], 4), input_partitions=1
12241224
08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/closure.csv]]}, projection=[start, end], file_type=csv, has_header=true
12251225

1226+
# Regression test: recursive CTE column-name stability under ORDER BY + LIMIT.
1227+
#
1228+
# The recursive CTE below has an anchor that aliases a computed column
1229+
# (`upper(val) AS val`) while the recursive term leaves the same expression
1230+
# un-aliased (`upper(r.val)`). RecursiveQueryExec declares its output schema
1231+
# from the anchor, so the plan-level column is `val`; but if it forwards the
1232+
# recursive term's batches without rebinding their schema to the anchor's,
1233+
# downstream consumers that read `batch.schema().field(i).name()` observe
1234+
# the leaked `upper(r.val)` name. The outer `ORDER BY ts LIMIT 10` routes
1235+
# through the TopK path in SortExec, which passes batch schemas through
1236+
# verbatim — so the leak surfaces there.
1237+
#
1238+
# sqllogictest compares row values and can't assert column names directly.
1239+
# To surface the leak inside SLT, we `COPY` the result to a CSV file with a
1240+
# header row (the CSV writer names header columns from each batch's own
1241+
# schema) and then read the file back as headerless CSV so the header line
1242+
# appears as a data row we can compare against the expected column names.
1243+
statement ok
1244+
CREATE TABLE cte_schema_records (
1245+
id VARCHAR NOT NULL,
1246+
parent_id VARCHAR,
1247+
ts TIMESTAMP NOT NULL,
1248+
val VARCHAR
1249+
);
1250+
1251+
statement ok
1252+
INSERT INTO cte_schema_records VALUES
1253+
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'),
1254+
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'),
1255+
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'),
1256+
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'),
1257+
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'),
1258+
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5');
1259+
1260+
query I
1261+
COPY (WITH RECURSIVE descendants AS (
1262+
SELECT id, parent_id, ts, upper(val) AS val
1263+
FROM cte_schema_records WHERE id = 'a00'
1264+
UNION ALL
1265+
SELECT r.id, r.parent_id, r.ts, upper(r.val)
1266+
FROM cte_schema_records r
1267+
INNER JOIN descendants d ON r.parent_id = d.id
1268+
)
1269+
SELECT id, parent_id, ts, val
1270+
FROM descendants
1271+
ORDER BY ts ASC
1272+
LIMIT 10)
1273+
TO 'test_files/scratch/cte/recursive_order_limit_schema/'
1274+
STORED AS CSV OPTIONS ('format.has_header' 'true');
1275+
----
1276+
6
1277+
1278+
statement ok
1279+
CREATE EXTERNAL TABLE cte_schema_reread (
1280+
id VARCHAR, parent_id VARCHAR, ts VARCHAR, val VARCHAR
1281+
)
1282+
STORED AS CSV
1283+
LOCATION 'test_files/scratch/cte/recursive_order_limit_schema/'
1284+
OPTIONS ('format.has_header' 'false');
1285+
1286+
# The CSV header row appears as a data row here because we re-read with
1287+
# has_header='false'. It must match the CTE's declared column names; if
1288+
# RecursiveQueryExec leaked the recursive branch's schema, the last field
1289+
# would read `upper(r.val)` instead of `val`.
1290+
query TTTT
1291+
SELECT id, parent_id, ts, val
1292+
FROM cte_schema_reread
1293+
WHERE id = 'id';
1294+
----
1295+
id parent_id ts val
1296+
1297+
statement ok
1298+
DROP TABLE cte_schema_reread;
1299+
1300+
statement ok
1301+
DROP TABLE cte_schema_records;
1302+
12261303
statement count 0
12271304
set datafusion.execution.enable_recursive_ctes = false;
12281305

0 commit comments

Comments
 (0)