Skip to content

Commit 2e24695

Browse files
authored
chore: Clean up and split shuffle module (#3395)
1 parent a2f8e54 commit 2e24695

8 files changed

Lines changed: 1018 additions & 888 deletions

File tree

native/core/src/execution/shuffle/comet_partitioning.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,26 @@ impl CometPartitioning {
4646
}
4747
}
4848
}
49+
50+
pub(super) fn pmod(hash: u32, n: usize) -> usize {
51+
let hash = hash as i32;
52+
let n = n as i32;
53+
let r = hash % n;
54+
let result = if r < 0 { (r + n) % n } else { r };
55+
result as usize
56+
}
57+
58+
#[cfg(test)]
59+
mod tests {
60+
use super::*;
61+
62+
#[test]
63+
fn test_pmod() {
64+
let i: Vec<u32> = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb];
65+
let result = i.into_iter().map(|i| pmod(i, 200)).collect::<Vec<usize>>();
66+
67+
// expected partition from Spark with n=200
68+
let expected = vec![69, 5, 193, 171, 115];
69+
assert_eq!(result, expected);
70+
}
71+
}

native/core/src/execution/shuffle/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
pub(crate) mod codec;
1919
mod comet_partitioning;
2020
mod metrics;
21+
mod partitioners;
2122
mod shuffle_writer;
2223
pub mod spark_unsafe;
2324
mod writers;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod multi_partition;
19+
mod partitioned_batch_iterator;
20+
mod single_partition;
21+
22+
use arrow::record_batch::RecordBatch;
23+
use datafusion::common::Result;
24+
25+
pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
26+
pub(super) use partitioned_batch_iterator::PartitionedBatchIterator;
27+
pub(super) use single_partition::SinglePartitionShufflePartitioner;
28+
29+
#[async_trait::async_trait]
30+
pub(super) trait ShufflePartitioner: Send + Sync {
31+
/// Insert a batch into the partitioner
32+
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
33+
/// Write shuffle data and shuffle index file to disk
34+
fn shuffle_write(&mut self) -> Result<()>;
35+
}

0 commit comments

Comments
 (0)