Skip to content

Commit fd6e7c8

Browse files
authored
Merge branch 'main' into row_group_limit_pruning
2 parents d80a354 + 7b0ed2d commit fd6e7c8

32 files changed

Lines changed: 1034 additions & 341 deletions

File tree

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@90558ad1e179036f31467972b00dec6cb80701fa # v2.66.3
45+
uses: taiki-e/install-action@2e9d707ef49c9b094d45955b60c7e5c0dfedeb14 # v2.66.5
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/dev.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
runs-on: ubuntu-latest
4545
steps:
4646
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
47-
- uses: actions/setup-node@395ad3262231945c25e8478fd5baf05154b1d79f # v6.1.0
47+
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v6.2.0
4848
with:
4949
node-version: "20"
5050
- name: Prettier check

.github/workflows/rust.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ jobs:
421421
sudo apt-get update -qq
422422
sudo apt-get install -y -qq clang
423423
- name: Setup wasm-pack
424-
uses: taiki-e/install-action@90558ad1e179036f31467972b00dec6cb80701fa # v2.66.3
424+
uses: taiki-e/install-action@2e9d707ef49c9b094d45955b60c7e5c0dfedeb14 # v2.66.5
425425
with:
426426
tool: wasm-pack
427427
- name: Run tests with headless mode
@@ -695,7 +695,7 @@ jobs:
695695
uses: ./.github/actions/setup-builder
696696
with:
697697
rust-version: stable
698-
- uses: actions/setup-node@395ad3262231945c25e8478fd5baf05154b1d79f # v6.1.0
698+
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v6.2.0
699699
with:
700700
node-version: "20"
701701
- name: Check if configs.md has been modified
@@ -741,7 +741,7 @@ jobs:
741741
- name: Setup Rust toolchain
742742
uses: ./.github/actions/setup-builder
743743
- name: Install cargo-msrv
744-
uses: taiki-e/install-action@90558ad1e179036f31467972b00dec6cb80701fa # v2.66.3
744+
uses: taiki-e/install-action@2e9d707ef49c9b094d45955b60c7e5c0dfedeb14 # v2.66.5
745745
with:
746746
tool: cargo-msrv
747747

datafusion/common/src/scalar/mod.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2988,13 +2988,8 @@ impl ScalarValue {
29882988
},
29892989
ScalarValue::Utf8View(e) => match e {
29902990
Some(value) => {
2991-
let mut builder =
2992-
StringViewBuilder::with_capacity(size).with_deduplicate_strings();
2993-
// Replace with upstream arrow-rs code when available:
2994-
// https://github.com/apache/arrow-rs/issues/9034
2995-
for _ in 0..size {
2996-
builder.append_value(value);
2997-
}
2991+
let mut builder = StringViewBuilder::with_capacity(size);
2992+
builder.try_append_value_n(value, size)?;
29982993
let array = builder.finish();
29992994
Arc::new(array)
30002995
}
@@ -3012,11 +3007,8 @@ impl ScalarValue {
30123007
},
30133008
ScalarValue::BinaryView(e) => match e {
30143009
Some(value) => {
3015-
let mut builder =
3016-
BinaryViewBuilder::with_capacity(size).with_deduplicate_strings();
3017-
for _ in 0..size {
3018-
builder.append_value(value);
3019-
}
3010+
let mut builder = BinaryViewBuilder::with_capacity(size);
3011+
builder.try_append_value_n(value, size)?;
30203012
let array = builder.finish();
30213013
Arc::new(array)
30223014
}

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,3 +281,7 @@ name = "spm"
281281
harness = false
282282
name = "preserve_file_partitioning"
283283
required-features = ["parquet"]
284+
285+
[[bench]]
286+
harness = false
287+
name = "reset_plan_states"
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::{Arc, LazyLock};
19+
20+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
21+
use criterion::{Criterion, criterion_group, criterion_main};
22+
use datafusion::prelude::SessionContext;
23+
use datafusion_catalog::MemTable;
24+
use datafusion_physical_plan::ExecutionPlan;
25+
use datafusion_physical_plan::displayable;
26+
use datafusion_physical_plan::execution_plan::reset_plan_states;
27+
use tokio::runtime::Runtime;
28+
29+
const NUM_FIELDS: usize = 1000;
30+
const PREDICATE_LEN: usize = 50;
31+
32+
static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
33+
Arc::new(Schema::new(
34+
(0..NUM_FIELDS)
35+
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
36+
.collect::<Fields>(),
37+
))
38+
});
39+
40+
fn col_name(i: usize) -> String {
41+
format!("x_{i}")
42+
}
43+
44+
fn aggr_name(i: usize) -> String {
45+
format!("aggr_{i}")
46+
}
47+
48+
fn physical_plan(
49+
ctx: &SessionContext,
50+
rt: &Runtime,
51+
sql: &str,
52+
) -> Arc<dyn ExecutionPlan> {
53+
rt.block_on(async {
54+
ctx.sql(sql)
55+
.await
56+
.unwrap()
57+
.create_physical_plan()
58+
.await
59+
.unwrap()
60+
})
61+
}
62+
63+
fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
64+
let mut predicate = String::new();
65+
for i in 0..len {
66+
if i > 0 {
67+
predicate.push_str(" AND ");
68+
}
69+
predicate.push_str(&col_name(i));
70+
predicate.push_str(" = ");
71+
predicate.push_str(&i.to_string());
72+
}
73+
predicate
74+
}
75+
76+
/// Returns a typical plan for the query like:
77+
///
78+
/// ```sql
79+
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
80+
/// WHERE p1
81+
/// HAVING p2
82+
/// ```
83+
///
84+
/// Where `p1` and `p2` some long predicates.
85+
///
86+
fn query1() -> String {
87+
let mut query = String::new();
88+
query.push_str("SELECT ");
89+
for i in 0..NUM_FIELDS {
90+
if i > 0 {
91+
query.push_str(", ");
92+
}
93+
query.push_str("AVG(");
94+
query.push_str(&col_name(i));
95+
query.push_str(") AS ");
96+
query.push_str(&aggr_name(i));
97+
}
98+
query.push_str(" FROM t WHERE ");
99+
query.push_str(&predicate(col_name, PREDICATE_LEN));
100+
query.push_str(" HAVING ");
101+
query.push_str(&predicate(aggr_name, PREDICATE_LEN));
102+
query
103+
}
104+
105+
/// Returns a typical plan for the query like:
106+
///
107+
/// ```sql
108+
/// SELECT projection FROM t JOIN v ON t.a = v.a
109+
/// WHERE p1
110+
/// ```
111+
///
112+
fn query2() -> String {
113+
let mut query = String::new();
114+
query.push_str("SELECT ");
115+
for i in (0..NUM_FIELDS).step_by(2) {
116+
if i > 0 {
117+
query.push_str(", ");
118+
}
119+
if (i / 2) % 2 == 0 {
120+
query.push_str(&format!("t.{}", col_name(i)));
121+
} else {
122+
query.push_str(&format!("v.{}", col_name(i)));
123+
}
124+
}
125+
query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");
126+
127+
fn qualified_name(i: usize) -> String {
128+
format!("t.{}", col_name(i))
129+
}
130+
131+
query.push_str(&predicate(qualified_name, PREDICATE_LEN));
132+
query
133+
}
134+
135+
/// Returns a typical plan for the query like:
136+
///
137+
/// ```sql
138+
/// SELECT projection FROM t
139+
/// WHERE p
140+
/// ```
141+
///
142+
fn query3() -> String {
143+
let mut query = String::new();
144+
query.push_str("SELECT ");
145+
146+
// Create non-trivial projection.
147+
for i in 0..NUM_FIELDS / 2 {
148+
if i > 0 {
149+
query.push_str(", ");
150+
}
151+
query.push_str(&col_name(i * 2));
152+
query.push_str(" + ");
153+
query.push_str(&col_name(i * 2 + 1));
154+
}
155+
156+
query.push_str(" FROM t WHERE ");
157+
query.push_str(&predicate(col_name, PREDICATE_LEN));
158+
query
159+
}
160+
161+
fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
162+
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
163+
}
164+
165+
/// Benchmark is intended to measure overhead of actions, required to perform
166+
/// making an independent instance of the execution plan to re-execute it, avoiding
167+
/// re-planning stage.
168+
fn bench_reset_plan_states(c: &mut Criterion) {
169+
let rt = Runtime::new().unwrap();
170+
let ctx = SessionContext::new();
171+
ctx.register_table(
172+
"t",
173+
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
174+
)
175+
.unwrap();
176+
177+
ctx.register_table(
178+
"v",
179+
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
180+
)
181+
.unwrap();
182+
183+
macro_rules! bench_query {
184+
($query_producer: expr) => {{
185+
let sql = $query_producer();
186+
let plan = physical_plan(&ctx, &rt, &sql);
187+
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
188+
move |b| run_reset_states(b, &plan)
189+
}};
190+
}
191+
192+
c.bench_function("query1", bench_query!(query1));
193+
c.bench_function("query2", bench_query!(query2));
194+
c.bench_function("query3", bench_query!(query3));
195+
}
196+
197+
criterion_group!(benches, bench_reset_plan_states);
198+
criterion_main!(benches);

datafusion/core/benches/sql_planner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
118118

119119
let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");
120120

121+
// ClickBench partitioned dataset was written by an ancient version of pyarrow that
122+
// that wrote strings with the wrong logical type. To read it correctly, we must
123+
// automatically convert binary to string.
124+
rt.block_on(ctx.sql("SET datafusion.execution.parquet.binary_as_string = true;"))
125+
.unwrap();
121126
rt.block_on(ctx.sql(&sql)).unwrap();
122127

123128
let count =

datafusion/execution/src/cache/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod list_files_cache;
2424

2525
pub use file_metadata_cache::DefaultFilesMetadataCache;
2626
pub use list_files_cache::DefaultListFilesCache;
27+
pub use list_files_cache::ListFilesEntry;
2728
pub use list_files_cache::TableScopedPath;
2829

2930
/// Base trait for cache implementations with common operations.

0 commit comments

Comments
 (0)