Skip to content

Commit f76f58a

Browse files
zhuqi-lucasmartin-g2010YOUY01Copilotalamb
committed
Add sorted data benchmark. (apache#19042)
## Which issue does this PR close? Add sorted data benchmark. - Closes[ apache#18976](apache#18976) ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes, test results for reverse parquet PR, it's 30X faster than main branch for sorted data: apache#18817 ```rust Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime --sort-order ASC -o /Users/zhuqi/arrow-datafusion/benchmarks/results/reverse_parquet/data_sorted_clickbench.json` Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/reverse_parquet/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC" } ⚠️ Forcing target_partitions=1 to preserve sort order ⚠️ (Because we want to get the pure performance benefit of sorted data to compare) 📊 Session config target_partitions: 1 Registering table with sort order: EventTime ASC Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet' WITH ORDER ("EventTime" ASC) Q0: -- Must set for ClickBench hits_partitioned dataset. See apache#16591 -- set datafusion.execution.parquet.binary_as_string = true SELECT * FROM hits ORDER BY "EventTime" DESC limit 10; Query 0 iteration 0 took 14.7 ms and returned 10 rows Query 0 iteration 1 took 10.2 ms and returned 10 rows Query 0 iteration 2 took 8.7 ms and returned 10 rows Query 0 iteration 3 took 7.9 ms and returned 10 rows Query 0 iteration 4 took 7.9 ms and returned 10 rows Query 0 avg time: 9.85 ms + set +x Done ``` And the main branch result: ```rust Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime --sort-order ASC -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json` Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC" } ⚠️ Forcing target_partitions=1 to preserve sort order ⚠️ (Because we want to get the pure performance benefit of sorted data to compare) 📊 Session config target_partitions: 1 Registering table with sort order: EventTime ASC Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet' WITH ORDER ("EventTime" ASC) Q0: -- Must set for ClickBench hits_partitioned dataset. See apache#16591 -- set datafusion.execution.parquet.binary_as_string = true SELECT * FROM hits ORDER BY "EventTime" DESC limit 10; Query 0 iteration 0 took 331.1 ms and returned 10 rows Query 0 iteration 1 took 286.0 ms and returned 10 rows Query 0 iteration 2 took 283.3 ms and returned 10 rows Query 0 iteration 3 took 283.8 ms and returned 10 rows Query 0 iteration 4 took 286.5 ms and returned 10 rows Query 0 avg time: 294.13 ms + set +x Done ``` ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Yongting You <2010youy01@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> (cherry picked from commit cde6dfa)
1 parent 419fea5 commit f76f58a

4 files changed

Lines changed: 264 additions & 10 deletions

File tree

benchmarks/README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,3 +804,41 @@ Getting results...
804804
cancelling thread
805805
done dropping runtime in 83.531417ms
806806
```
807+
808+
## Sorted Data Benchmarks
809+
810+
### Data Sorted ClickBench
811+
812+
Benchmark for queries on pre-sorted data to test sort order optimization.
813+
This benchmark uses a subset of the ClickBench dataset (hits.parquet, ~14GB) that has been pre-sorted by the EventTime column. The queries are designed to test DataFusion's performance when the data is already sorted as is common in timeseries workloads.
814+
815+
The benchmark includes queries that:
816+
- Scan pre-sorted data with ORDER BY clauses that match the sort order
817+
- Test reverse scans on sorted data
818+
- Verify the performance result
819+
820+
#### Generating Sorted Data
821+
822+
The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
823+
```bash
824+
./bench.sh data data_sorted_clickbench
825+
```
826+
827+
To create the sorted dataset, for example with 16GB of memory, run:
828+
829+
```bash
830+
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
831+
```
832+
833+
This command will:
834+
1. Download the ClickBench partitioned dataset if not present
835+
2. Sort hits.parquet by EventTime in ascending order
836+
3. Save the sorted file as hits_sorted.parquet
837+
838+
#### Running the Benchmark
839+
840+
```bash
841+
./bench.sh run data_sorted_clickbench
842+
```
843+
844+
This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.

benchmarks/bench.sh

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
9999
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
100100
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
101101
102+
# Sorted Data Benchmarks (ORDER BY Optimization)
103+
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
104+
102105
# H2O.ai Benchmarks (Group By, Join, Window)
103106
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
104107
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
@@ -314,6 +317,9 @@ main() {
314317
compile_profile)
315318
data_tpch "1"
316319
;;
320+
clickbench_sorted)
321+
clickbench_sorted
322+
;;
317323
*)
318324
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
319325
usage
@@ -445,7 +451,7 @@ main() {
445451
h2o_medium_window)
446452
run_h2o_window "MEDIUM" "CSV" "window"
447453
;;
448-
h2o_big_window)
454+
h2o_big_window)
449455
run_h2o_window "BIG" "CSV" "window"
450456
;;
451457
h2o_small_parquet)
@@ -497,6 +503,9 @@ main() {
497503
compile_profile)
498504
run_compile_profile "${PROFILE_ARGS[@]}"
499505
;;
506+
clickbench_sorted)
507+
run_clickbench_sorted
508+
;;
500509
*)
501510
echo "Error: unknown benchmark '$BENCHMARK' for run"
502511
usage
@@ -1189,6 +1198,113 @@ compare_benchmarks() {
11891198

11901199
}
11911200

1201+
# Creates sorted ClickBench data from hits.parquet (full dataset)
1202+
# The data is sorted by EventTime in ascending order
1203+
# Uses datafusion-cli to reduce dependencies
1204+
clickbench_sorted() {
1205+
SORTED_FILE="${DATA_DIR}/hits_sorted.parquet"
1206+
ORIGINAL_FILE="${DATA_DIR}/hits.parquet"
1207+
1208+
# Default memory limit is 12GB, can be overridden with DATAFUSION_MEMORY_GB env var
1209+
MEMORY_LIMIT_GB=${DATAFUSION_MEMORY_GB:-12}
1210+
1211+
echo "Creating sorted ClickBench dataset from hits.parquet..."
1212+
echo "Configuration:"
1213+
echo " Memory limit: ${MEMORY_LIMIT_GB}G"
1214+
echo " Row group size: 64K rows"
1215+
echo " Compression: uncompressed"
1216+
1217+
if [ ! -f "${ORIGINAL_FILE}" ]; then
1218+
echo "hits.parquet not found. Running data_clickbench_1 first..."
1219+
data_clickbench_1
1220+
fi
1221+
1222+
if [ -f "${SORTED_FILE}" ]; then
1223+
echo "Sorted hits.parquet already exists at ${SORTED_FILE}"
1224+
return 0
1225+
fi
1226+
1227+
echo "Sorting hits.parquet by EventTime (this may take several minutes)..."
1228+
1229+
pushd "${DATAFUSION_DIR}" > /dev/null
1230+
echo "Building datafusion-cli..."
1231+
cargo build --release --bin datafusion-cli
1232+
DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
1233+
popd > /dev/null
1234+
1235+
1236+
START_TIME=$(date +%s)
1237+
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
1238+
echo "Using datafusion-cli to create sorted parquet file..."
1239+
"${DATAFUSION_CLI}" << EOF
1240+
-- Memory and performance configuration
1241+
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
1242+
SET datafusion.execution.spill_compression = 'uncompressed';
1243+
SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB
1244+
SET datafusion.execution.batch_size = 8192;
1245+
SET datafusion.execution.target_partitions = 1;
1246+
1247+
-- Parquet output configuration
1248+
SET datafusion.execution.parquet.max_row_group_size = 65536;
1249+
SET datafusion.execution.parquet.compression = 'uncompressed';
1250+
1251+
-- Execute sort and write
1252+
COPY (SELECT * FROM '${ORIGINAL_FILE}' ORDER BY "EventTime")
1253+
TO '${SORTED_FILE}'
1254+
STORED AS PARQUET;
1255+
EOF
1256+
1257+
local result=$?
1258+
1259+
END_TIME=$(date +%s)
1260+
DURATION=$((END_TIME - START_TIME))
1261+
echo "End time: $(date '+%Y-%m-%d %H:%M:%S')"
1262+
1263+
if [ $result -eq 0 ]; then
1264+
echo "✓ Successfully created sorted ClickBench dataset"
1265+
1266+
INPUT_SIZE=$(stat -f%z "${ORIGINAL_FILE}" 2>/dev/null || stat -c%s "${ORIGINAL_FILE}" 2>/dev/null)
1267+
OUTPUT_SIZE=$(stat -f%z "${SORTED_FILE}" 2>/dev/null || stat -c%s "${SORTED_FILE}" 2>/dev/null)
1268+
INPUT_MB=$((INPUT_SIZE / 1024 / 1024))
1269+
OUTPUT_MB=$((OUTPUT_SIZE / 1024 / 1024))
1270+
1271+
echo " Input: ${INPUT_MB} MB"
1272+
echo " Output: ${OUTPUT_MB} MB"
1273+
1274+
echo ""
1275+
echo "Time Statistics:"
1276+
echo " Total duration: ${DURATION} seconds ($(printf '%02d:%02d:%02d' $((DURATION/3600)) $((DURATION%3600/60)) $((DURATION%60))))"
1277+
echo " Throughput: $((INPUT_MB / DURATION)) MB/s"
1278+
1279+
return 0
1280+
else
1281+
echo "✗ Error: Failed to create sorted dataset"
1282+
echo "💡 Tip: Try increasing memory with: DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted"
1283+
return 1
1284+
fi
1285+
}
1286+
1287+
# Runs the sorted data benchmark with prefer_existing_sort configuration
1288+
run_clickbench_sorted() {
1289+
RESULTS_FILE="${RESULTS_DIR}/clickbench_sorted.json"
1290+
echo "RESULTS_FILE: ${RESULTS_FILE}"
1291+
echo "Running sorted data benchmark with prefer_existing_sort optimization..."
1292+
1293+
# Ensure sorted data exists
1294+
clickbench_sorted
1295+
1296+
# Run benchmark with prefer_existing_sort configuration
1297+
# This allows DataFusion to optimize away redundant sorts while maintaining parallelism
1298+
debug_run $CARGO_COMMAND --bin dfbench -- clickbench \
1299+
--iterations 5 \
1300+
--path "${DATA_DIR}/hits_sorted.parquet" \
1301+
--queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \
1302+
--sorted-by "EventTime" \
1303+
-c datafusion.optimizer.prefer_existing_sort=true \
1304+
-o "${RESULTS_FILE}" \
1305+
${QUERY_ARG}
1306+
}
1307+
11921308
setup_venv() {
11931309
python3 -m venv "$VIRTUAL_ENV"
11941310
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;

benchmarks/src/clickbench.rs

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ pub struct RunOpt {
7878
/// If present, write results json here
7979
#[structopt(parse(from_os_str), short = "o", long = "output")]
8080
output_path: Option<PathBuf>,
81+
82+
/// Column name that the data is sorted by (e.g., "EventTime")
83+
/// If specified, DataFusion will be informed that the data has this sort order
84+
/// using CREATE EXTERNAL TABLE with WITH ORDER clause.
85+
///
86+
/// Recommended to use with: -c datafusion.optimizer.prefer_existing_sort=true
87+
/// This allows DataFusion to optimize away redundant sorts while maintaining
88+
/// multi-core parallelism for other operations.
89+
#[structopt(long = "sorted-by")]
90+
sorted_by: Option<String>,
91+
92+
/// Sort order: ASC or DESC (default: ASC)
93+
#[structopt(long = "sort-order", default_value = "ASC")]
94+
sort_order: String,
95+
96+
/// Configuration options in the format key=value
97+
/// Can be specified multiple times.
98+
///
99+
/// Example: -c datafusion.optimizer.prefer_existing_sort=true
100+
#[structopt(short = "c", long = "config")]
101+
config_options: Vec<String>,
81102
}
82103

83104
/// Get the SQL file path
@@ -125,6 +146,37 @@ impl RunOpt {
125146

126147
// configure parquet options
127148
let mut config = self.common.config()?;
149+
150+
if self.sorted_by.is_some() {
151+
println!("ℹ️ Data is registered with sort order");
152+
153+
let has_prefer_sort = self
154+
.config_options
155+
.iter()
156+
.any(|opt| opt.contains("prefer_existing_sort=true"));
157+
158+
if !has_prefer_sort {
159+
println!("ℹ️ Consider using -c datafusion.optimizer.prefer_existing_sort=true");
160+
println!("ℹ️ to optimize queries while maintaining parallelism");
161+
}
162+
}
163+
164+
// Apply user-provided configuration options
165+
for config_opt in &self.config_options {
166+
let parts: Vec<&str> = config_opt.splitn(2, '=').collect();
167+
if parts.len() != 2 {
168+
return Err(exec_datafusion_err!(
169+
"Invalid config option format: '{}'. Expected 'key=value'",
170+
config_opt
171+
));
172+
}
173+
let key = parts[0];
174+
let value = parts[1];
175+
176+
println!("Setting config: {key} = {value}");
177+
config = config.set_str(key, value);
178+
}
179+
128180
{
129181
let parquet_options = &mut config.options_mut().execution.parquet;
130182
// The hits_partitioned dataset specifies string columns
@@ -136,10 +188,18 @@ impl RunOpt {
136188
parquet_options.pushdown_filters = true;
137189
parquet_options.reorder_filters = true;
138190
}
191+
192+
if self.sorted_by.is_some() {
193+
// We should compare the dynamic topk optimization when data is sorted, so we make the
194+
// assumption that filter pushdown is also enabled in this case.
195+
parquet_options.pushdown_filters = true;
196+
parquet_options.reorder_filters = true;
197+
}
139198
}
140199

141200
let rt_builder = self.common.runtime_env_builder()?;
142201
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
202+
143203
self.register_hits(&ctx).await?;
144204

145205
let mut benchmark_run = BenchmarkRun::new();
@@ -214,17 +274,54 @@ impl RunOpt {
214274
}
215275

216276
/// Registers the `hits.parquet` as a table named `hits`
277+
/// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER
217278
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
218-
let options = Default::default();
219279
let path = self.path.as_os_str().to_str().unwrap();
220-
ctx.register_parquet("hits", path, options)
221-
.await
222-
.map_err(|e| {
223-
DataFusionError::Context(
224-
format!("Registering 'hits' as {path}"),
225-
Box::new(e),
226-
)
227-
})
280+
281+
// If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER
282+
if let Some(ref sort_column) = self.sorted_by {
283+
println!(
284+
"Registering table with sort order: {} {}",
285+
sort_column, self.sort_order
286+
);
287+
288+
// Escape column name with double quotes
289+
let escaped_column = if sort_column.contains('"') {
290+
sort_column.clone()
291+
} else {
292+
format!("\"{sort_column}\"")
293+
};
294+
295+
// Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause
296+
// Schema will be automatically inferred from the Parquet file
297+
let create_table_sql = format!(
298+
"CREATE EXTERNAL TABLE hits \
299+
STORED AS PARQUET \
300+
LOCATION '{}' \
301+
WITH ORDER ({} {})",
302+
path,
303+
escaped_column,
304+
self.sort_order.to_uppercase()
305+
);
306+
307+
println!("Executing: {create_table_sql}");
308+
309+
// Execute the CREATE EXTERNAL TABLE statement
310+
ctx.sql(&create_table_sql).await?.collect().await?;
311+
312+
Ok(())
313+
} else {
314+
// Original registration without sort order
315+
let options = Default::default();
316+
ctx.register_parquet("hits", path, options)
317+
.await
318+
.map_err(|e| {
319+
DataFusionError::Context(
320+
format!("Registering 'hits' as {path}"),
321+
Box::new(e),
322+
)
323+
})
324+
}
228325
}
229326

230327
fn iterations(&self) -> usize {

0 commit comments

Comments
 (0)