Skip to content

Commit cde6dfa

Browse files
zhuqi-lucasmartin-g2010YOUY01Copilotalamb
authored
Add sorted data benchmark. (#19042)
## Which issue does this PR close? Add sorted data benchmark. - Closes[ #18976](#18976) ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes, test results for reverse parquet PR, it's 30X faster than main branch for sorted data: #18817 ```rust Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime --sort-order ASC -o /Users/zhuqi/arrow-datafusion/benchmarks/results/reverse_parquet/data_sorted_clickbench.json` Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/reverse_parquet/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC" } ⚠️ Forcing target_partitions=1 to preserve sort order ⚠️ (Because we want to get the pure performance benefit of sorted data to compare) 📊 Session config target_partitions: 1 Registering table with sort order: EventTime ASC Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet' WITH ORDER ("EventTime" ASC) Q0: -- Must set for ClickBench hits_partitioned dataset. See #16591 -- set datafusion.execution.parquet.binary_as_string = true SELECT * FROM hits ORDER BY "EventTime" DESC limit 10; Query 0 iteration 0 took 14.7 ms and returned 10 rows Query 0 iteration 1 took 10.2 ms and returned 10 rows Query 0 iteration 2 took 8.7 ms and returned 10 rows Query 0 iteration 3 took 7.9 ms and returned 10 rows Query 0 iteration 4 took 7.9 ms and returned 10 rows Query 0 avg time: 9.85 ms + set +x Done ``` And the main branch result: ```rust Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime --sort-order ASC -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json` Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC" } ⚠️ Forcing target_partitions=1 to preserve sort order ⚠️ (Because we want to get the pure performance benefit of sorted data to compare) 📊 Session config target_partitions: 1 Registering table with sort order: EventTime ASC Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet' WITH ORDER ("EventTime" ASC) Q0: -- Must set for ClickBench hits_partitioned dataset. See #16591 -- set datafusion.execution.parquet.binary_as_string = true SELECT * FROM hits ORDER BY "EventTime" DESC limit 10; Query 0 iteration 0 took 331.1 ms and returned 10 rows Query 0 iteration 1 took 286.0 ms and returned 10 rows Query 0 iteration 2 took 283.3 ms and returned 10 rows Query 0 iteration 3 took 283.8 ms and returned 10 rows Query 0 iteration 4 took 286.5 ms and returned 10 rows Query 0 avg time: 294.13 ms + set +x Done ``` ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Yongting You <2010youy01@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 662a3ba commit cde6dfa

4 files changed

Lines changed: 264 additions & 10 deletions

File tree

benchmarks/README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,3 +832,41 @@ Getting results...
832832
cancelling thread
833833
done dropping runtime in 83.531417ms
834834
```
835+
836+
## Sorted Data Benchmarks
837+
838+
### Data Sorted ClickBench
839+
840+
Benchmark for queries on pre-sorted data to test sort order optimization.
841+
This benchmark uses a subset of the ClickBench dataset (hits.parquet, ~14GB) that has been pre-sorted by the EventTime column. The queries are designed to test DataFusion's performance when the data is already sorted as is common in timeseries workloads.
842+
843+
The benchmark includes queries that:
844+
- Scan pre-sorted data with ORDER BY clauses that match the sort order
845+
- Test reverse scans on sorted data
846+
- Verify the performance result
847+
848+
#### Generating Sorted Data
849+
850+
The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
851+
```bash
852+
./bench.sh data data_sorted_clickbench
853+
```
854+
855+
To create the sorted dataset, for example with 16GB of memory, run:
856+
857+
```bash
858+
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
859+
```
860+
861+
This command will:
862+
1. Download the ClickBench partitioned dataset if not present
863+
2. Sort hits.parquet by EventTime in ascending order
864+
3. Save the sorted file as hits_sorted.parquet
865+
866+
#### Running the Benchmark
867+
868+
```bash
869+
./bench.sh run data_sorted_clickbench
870+
```
871+
872+
This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.

benchmarks/bench.sh

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
102102
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
103103
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
104104
105+
# Sorted Data Benchmarks (ORDER BY Optimization)
106+
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
107+
105108
# H2O.ai Benchmarks (Group By, Join, Window)
106109
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
107110
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
@@ -324,6 +327,9 @@ main() {
324327
compile_profile)
325328
data_tpch "1" "parquet"
326329
;;
330+
clickbench_sorted)
331+
clickbench_sorted
332+
;;
327333
*)
328334
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
329335
usage
@@ -459,7 +465,7 @@ main() {
459465
h2o_medium_window)
460466
run_h2o_window "MEDIUM" "CSV" "window"
461467
;;
462-
h2o_big_window)
468+
h2o_big_window)
463469
run_h2o_window "BIG" "CSV" "window"
464470
;;
465471
h2o_small_parquet)
@@ -511,6 +517,9 @@ main() {
511517
compile_profile)
512518
run_compile_profile "${PROFILE_ARGS[@]}"
513519
;;
520+
clickbench_sorted)
521+
run_clickbench_sorted
522+
;;
514523
*)
515524
echo "Error: unknown benchmark '$BENCHMARK' for run"
516525
usage
@@ -1260,6 +1269,113 @@ compare_benchmarks() {
12601269

12611270
}
12621271

1272+
# Creates sorted ClickBench data from hits.parquet (full dataset)
1273+
# The data is sorted by EventTime in ascending order
1274+
# Uses datafusion-cli to reduce dependencies
1275+
clickbench_sorted() {
1276+
SORTED_FILE="${DATA_DIR}/hits_sorted.parquet"
1277+
ORIGINAL_FILE="${DATA_DIR}/hits.parquet"
1278+
1279+
# Default memory limit is 12GB, can be overridden with DATAFUSION_MEMORY_GB env var
1280+
MEMORY_LIMIT_GB=${DATAFUSION_MEMORY_GB:-12}
1281+
1282+
echo "Creating sorted ClickBench dataset from hits.parquet..."
1283+
echo "Configuration:"
1284+
echo " Memory limit: ${MEMORY_LIMIT_GB}G"
1285+
echo " Row group size: 64K rows"
1286+
echo " Compression: uncompressed"
1287+
1288+
if [ ! -f "${ORIGINAL_FILE}" ]; then
1289+
echo "hits.parquet not found. Running data_clickbench_1 first..."
1290+
data_clickbench_1
1291+
fi
1292+
1293+
if [ -f "${SORTED_FILE}" ]; then
1294+
echo "Sorted hits.parquet already exists at ${SORTED_FILE}"
1295+
return 0
1296+
fi
1297+
1298+
echo "Sorting hits.parquet by EventTime (this may take several minutes)..."
1299+
1300+
pushd "${DATAFUSION_DIR}" > /dev/null
1301+
echo "Building datafusion-cli..."
1302+
cargo build --release --bin datafusion-cli
1303+
DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
1304+
popd > /dev/null
1305+
1306+
1307+
START_TIME=$(date +%s)
1308+
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
1309+
echo "Using datafusion-cli to create sorted parquet file..."
1310+
"${DATAFUSION_CLI}" << EOF
1311+
-- Memory and performance configuration
1312+
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
1313+
SET datafusion.execution.spill_compression = 'uncompressed';
1314+
SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB
1315+
SET datafusion.execution.batch_size = 8192;
1316+
SET datafusion.execution.target_partitions = 1;
1317+
1318+
-- Parquet output configuration
1319+
SET datafusion.execution.parquet.max_row_group_size = 65536;
1320+
SET datafusion.execution.parquet.compression = 'uncompressed';
1321+
1322+
-- Execute sort and write
1323+
COPY (SELECT * FROM '${ORIGINAL_FILE}' ORDER BY "EventTime")
1324+
TO '${SORTED_FILE}'
1325+
STORED AS PARQUET;
1326+
EOF
1327+
1328+
local result=$?
1329+
1330+
END_TIME=$(date +%s)
1331+
DURATION=$((END_TIME - START_TIME))
1332+
echo "End time: $(date '+%Y-%m-%d %H:%M:%S')"
1333+
1334+
if [ $result -eq 0 ]; then
1335+
echo "✓ Successfully created sorted ClickBench dataset"
1336+
1337+
INPUT_SIZE=$(stat -f%z "${ORIGINAL_FILE}" 2>/dev/null || stat -c%s "${ORIGINAL_FILE}" 2>/dev/null)
1338+
OUTPUT_SIZE=$(stat -f%z "${SORTED_FILE}" 2>/dev/null || stat -c%s "${SORTED_FILE}" 2>/dev/null)
1339+
INPUT_MB=$((INPUT_SIZE / 1024 / 1024))
1340+
OUTPUT_MB=$((OUTPUT_SIZE / 1024 / 1024))
1341+
1342+
echo " Input: ${INPUT_MB} MB"
1343+
echo " Output: ${OUTPUT_MB} MB"
1344+
1345+
echo ""
1346+
echo "Time Statistics:"
1347+
echo " Total duration: ${DURATION} seconds ($(printf '%02d:%02d:%02d' $((DURATION/3600)) $((DURATION%3600/60)) $((DURATION%60))))"
1348+
echo " Throughput: $((INPUT_MB / DURATION)) MB/s"
1349+
1350+
return 0
1351+
else
1352+
echo "✗ Error: Failed to create sorted dataset"
1353+
echo "💡 Tip: Try increasing memory with: DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted"
1354+
return 1
1355+
fi
1356+
}
1357+
1358+
# Runs the sorted data benchmark with prefer_existing_sort configuration
1359+
run_clickbench_sorted() {
1360+
RESULTS_FILE="${RESULTS_DIR}/clickbench_sorted.json"
1361+
echo "RESULTS_FILE: ${RESULTS_FILE}"
1362+
echo "Running sorted data benchmark with prefer_existing_sort optimization..."
1363+
1364+
# Ensure sorted data exists
1365+
clickbench_sorted
1366+
1367+
# Run benchmark with prefer_existing_sort configuration
1368+
# This allows DataFusion to optimize away redundant sorts while maintaining parallelism
1369+
debug_run $CARGO_COMMAND --bin dfbench -- clickbench \
1370+
--iterations 5 \
1371+
--path "${DATA_DIR}/hits_sorted.parquet" \
1372+
--queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \
1373+
--sorted-by "EventTime" \
1374+
-c datafusion.optimizer.prefer_existing_sort=true \
1375+
-o "${RESULTS_FILE}" \
1376+
${QUERY_ARG}
1377+
}
1378+
12631379
setup_venv() {
12641380
python3 -m venv "$VIRTUAL_ENV"
12651381
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;

benchmarks/src/clickbench.rs

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ pub struct RunOpt {
7878
/// If present, write results json here
7979
#[structopt(parse(from_os_str), short = "o", long = "output")]
8080
output_path: Option<PathBuf>,
81+
82+
/// Column name that the data is sorted by (e.g., "EventTime")
83+
/// If specified, DataFusion will be informed that the data has this sort order
84+
/// using CREATE EXTERNAL TABLE with WITH ORDER clause.
85+
///
86+
/// Recommended to use with: -c datafusion.optimizer.prefer_existing_sort=true
87+
/// This allows DataFusion to optimize away redundant sorts while maintaining
88+
/// multi-core parallelism for other operations.
89+
#[structopt(long = "sorted-by")]
90+
sorted_by: Option<String>,
91+
92+
/// Sort order: ASC or DESC (default: ASC)
93+
#[structopt(long = "sort-order", default_value = "ASC")]
94+
sort_order: String,
95+
96+
/// Configuration options in the format key=value
97+
/// Can be specified multiple times.
98+
///
99+
/// Example: -c datafusion.optimizer.prefer_existing_sort=true
100+
#[structopt(short = "c", long = "config")]
101+
config_options: Vec<String>,
81102
}
82103

83104
/// Get the SQL file path
@@ -125,6 +146,37 @@ impl RunOpt {
125146

126147
// configure parquet options
127148
let mut config = self.common.config()?;
149+
150+
if self.sorted_by.is_some() {
151+
println!("ℹ️ Data is registered with sort order");
152+
153+
let has_prefer_sort = self
154+
.config_options
155+
.iter()
156+
.any(|opt| opt.contains("prefer_existing_sort=true"));
157+
158+
if !has_prefer_sort {
159+
println!("ℹ️ Consider using -c datafusion.optimizer.prefer_existing_sort=true");
160+
println!("ℹ️ to optimize queries while maintaining parallelism");
161+
}
162+
}
163+
164+
// Apply user-provided configuration options
165+
for config_opt in &self.config_options {
166+
let parts: Vec<&str> = config_opt.splitn(2, '=').collect();
167+
if parts.len() != 2 {
168+
return Err(exec_datafusion_err!(
169+
"Invalid config option format: '{}'. Expected 'key=value'",
170+
config_opt
171+
));
172+
}
173+
let key = parts[0];
174+
let value = parts[1];
175+
176+
println!("Setting config: {key} = {value}");
177+
config = config.set_str(key, value);
178+
}
179+
128180
{
129181
let parquet_options = &mut config.options_mut().execution.parquet;
130182
// The hits_partitioned dataset specifies string columns
@@ -136,10 +188,18 @@ impl RunOpt {
136188
parquet_options.pushdown_filters = true;
137189
parquet_options.reorder_filters = true;
138190
}
191+
192+
if self.sorted_by.is_some() {
193+
// We should compare the dynamic topk optimization when data is sorted, so we make the
194+
// assumption that filter pushdown is also enabled in this case.
195+
parquet_options.pushdown_filters = true;
196+
parquet_options.reorder_filters = true;
197+
}
139198
}
140199

141200
let rt_builder = self.common.runtime_env_builder()?;
142201
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
202+
143203
self.register_hits(&ctx).await?;
144204

145205
let mut benchmark_run = BenchmarkRun::new();
@@ -214,17 +274,54 @@ impl RunOpt {
214274
}
215275

216276
/// Registers the `hits.parquet` as a table named `hits`
277+
/// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER
217278
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
218-
let options = Default::default();
219279
let path = self.path.as_os_str().to_str().unwrap();
220-
ctx.register_parquet("hits", path, options)
221-
.await
222-
.map_err(|e| {
223-
DataFusionError::Context(
224-
format!("Registering 'hits' as {path}"),
225-
Box::new(e),
226-
)
227-
})
280+
281+
// If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER
282+
if let Some(ref sort_column) = self.sorted_by {
283+
println!(
284+
"Registering table with sort order: {} {}",
285+
sort_column, self.sort_order
286+
);
287+
288+
// Escape column name with double quotes
289+
let escaped_column = if sort_column.contains('"') {
290+
sort_column.clone()
291+
} else {
292+
format!("\"{sort_column}\"")
293+
};
294+
295+
// Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause
296+
// Schema will be automatically inferred from the Parquet file
297+
let create_table_sql = format!(
298+
"CREATE EXTERNAL TABLE hits \
299+
STORED AS PARQUET \
300+
LOCATION '{}' \
301+
WITH ORDER ({} {})",
302+
path,
303+
escaped_column,
304+
self.sort_order.to_uppercase()
305+
);
306+
307+
println!("Executing: {create_table_sql}");
308+
309+
// Execute the CREATE EXTERNAL TABLE statement
310+
ctx.sql(&create_table_sql).await?.collect().await?;
311+
312+
Ok(())
313+
} else {
314+
// Original registration without sort order
315+
let options = Default::default();
316+
ctx.register_parquet("hits", path, options)
317+
.await
318+
.map_err(|e| {
319+
DataFusionError::Context(
320+
format!("Registering 'hits' as {path}"),
321+
Box::new(e),
322+
)
323+
})
324+
}
228325
}
229326

230327
fn iterations(&self) -> usize {

0 commit comments

Comments
 (0)