Skip to content

Commit b867a2d

Browse files
authored
Support crc32abs delimiter (#505)
支持crc32abs-delimiter; 修复ModRange 对特殊分片数,导致分布位置越界的问题; 新的rust版本,大量报错dead code导致ci失败,调整workflow规避;
1 parent 29d2869 commit b867a2d

File tree

9 files changed

+176
-8
lines changed

9 files changed

+176
-8
lines changed

.github/workflows/rust.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ env:
1919
min_key: 1
2020
max_key: 10000
2121
socks_dir: /home/runner/work/breeze/socks
22-
RUSTFLAGS: "-D warnings"
22+
# TODO rust版本升级后,突然大量warnings,暂时允许dead_code警告
23+
RUSTFLAGS: "-D warnings --allow dead_code"
2324

2425
jobs:
2526
build:

discovery/src/dns/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl Ipv4Vec {
346346
}
347347
self.len += 1;
348348
}
349-
pub fn iter(&self) -> Ipv4VecIter {
349+
pub fn iter(&self) -> Ipv4VecIter<'_> {
350350
let iter = if self.len() <= self.cache.len() {
351351
self.cache[..self.len as usize].iter()
352352
} else {

sharding/src/distribution/modrange.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
pub struct ModRange {
77
slot: u64,
88
interval: u64,
9+
shards: usize,
910
}
1011

1112
// ModRange 分布方法,默认总范围是[0,256),否则指定slot
@@ -17,6 +18,7 @@ impl ModRange {
1718
ModRange {
1819
slot,
1920
interval: slot / shards as u64,
21+
shards,
2022
}
2123
}
2224

@@ -27,6 +29,11 @@ impl ModRange {
2729
val = val.wrapping_abs();
2830
}
2931
let rs = val as u64 / self.interval;
32+
if rs >= self.shards as u64 {
33+
// 超出范围,返回最后一个分片
34+
log::warn!("found modrange slot out of bound, rs:{}, shards:{}", rs, self.shards);
35+
return self.shards - 1;
36+
}
3037
rs as usize
3138
}
3239
}

sharding/src/distribution/range.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::distribution::DIST_RANGE_SLOT_COUNT_DEFAULT;
66
pub struct Range {
77
slot: u64,
88
interval: u64,
9+
shards: usize,
910
}
1011

1112
// Range 分布方法,默认总范围是[0,256),否则用
@@ -16,6 +17,7 @@ impl Range {
1617
Range {
1718
slot,
1819
interval: slot / shards as u64,
20+
shards,
1921
}
2022
}
2123

@@ -28,6 +30,11 @@ impl Range {
2830
val = val.wrapping_abs();
2931
}
3032
let rs = val as u64 / self.interval;
33+
if rs >= self.shards as u64 {
34+
// 超出范围,返回最后一个分片
35+
log::warn!("found range slot out of bound, rs:{}, shards:{}", rs, self.shards);
36+
return self.shards - 1;
37+
}
3138
rs as usize
3239
}
3340
}

sharding/src/hash/crc32.rs

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::fmt::Display;
1212
/// ================ 基于i32版本 ================
1313
/// 备注:不取名crc32-abs,是为了区分i32/i63的base,及为后续扩展做准备 fishermen
1414
/// 7 crc32abs: 转为i32,然后进行abs操作
15-
/// 8 后续可能会有crc32abs-xxx,此处先预留语义
15+
/// 8 后续可能会有crc32abs-delimiter,基于point/pound/underscore之前的部分key,先转为i32,然后进行abs操作
1616
///
1717
1818
pub(super) const CRC32TAB: [i64; 256] = [
@@ -82,6 +82,18 @@ pub struct Crc32SmartNum {}
8282
#[derive(Default, Clone, Debug)]
8383
pub struct Crc32MixNum {}
8484

85+
/// 遵从i32进行crc32计算,并进行abs操作;
86+
#[derive(Debug, Clone, Default)]
87+
pub struct Crc32Abs;
88+
89+
/// 遵从i32进行crc32计算,并进行abs操作,且只计算startpos之后、delimiter之前的字符,格式:$start+$hashkey+$delimiter$
90+
#[derive(Default, Clone, Debug)]
91+
pub struct Crc32AbsDelimiter {
92+
start_pos: usize,
93+
delimiter: u8,
94+
name: DebugName,
95+
}
96+
8597
// 对全key做crc32
8698
impl super::Hash for Crc32 {
8799
#[inline]
@@ -331,10 +343,6 @@ impl super::Hash for Crc32MixNum {
331343
}
332344
}
333345

334-
/// 遵从i32进行crc32计算,并进行abs操作;
335-
#[derive(Debug, Clone, Default)]
336-
pub struct Crc32Abs;
337-
338346
impl Hash for Crc32Abs {
339347
fn hash<S: super::HashKey>(&self, key: &S) -> i64 {
340348
let mut crc: i64 = CRC_SEED;
@@ -356,3 +364,71 @@ impl Hash for Crc32Abs {
356364
crc as i64
357365
}
358366
}
367+
368+
/// --- Crc32AbsDelimiter impl: crc32abs-point/underscore/pound ---
369+
impl Crc32AbsDelimiter {
370+
pub fn from(alg: &str) -> Self {
371+
let alg_parts: Vec<&str> = alg.split(super::HASHER_NAME_DELIMITER).collect();
372+
373+
debug_assert!(alg_parts.len() >= 2);
374+
debug_assert_eq!(alg_parts[0], "crc32abs");
375+
376+
let delimiter = super::key_delimiter_name_2u8(alg, alg_parts[1]);
377+
378+
if alg_parts.len() == 2 {
379+
return Self {
380+
start_pos: 0,
381+
delimiter,
382+
name: alg.into(),
383+
};
384+
}
385+
386+
debug_assert!(alg_parts.len() == 3);
387+
if let Ok(prefix_len) = alg_parts[2].parse::<usize>() {
388+
return Self {
389+
start_pos: prefix_len,
390+
delimiter,
391+
name: alg.into(),
392+
};
393+
} else {
394+
log::debug!("found unknown hash/{}, ignore prefix instead", alg);
395+
return Self {
396+
start_pos: 0,
397+
delimiter,
398+
name: alg.into(),
399+
};
400+
}
401+
}
402+
}
403+
404+
impl super::Hash for Crc32AbsDelimiter {
405+
fn hash<S: super::HashKey>(&self, key: &S) -> i64 {
406+
let mut crc: i64 = CRC_SEED;
407+
debug_assert!(self.start_pos < key.len());
408+
409+
let check_delimiter = self.delimiter != super::KEY_DELIMITER_NONE;
410+
for i in self.start_pos..key.len() {
411+
let c = key.at(i);
412+
if check_delimiter && (c == self.delimiter) {
413+
break;
414+
}
415+
crc = ((crc >> 8) & 0x00FFFFFF) ^ CRC32TAB[((crc ^ (c as i64)) & 0xff) as usize];
416+
}
417+
418+
crc ^= CRC_SEED;
419+
crc &= CRC_SEED;
420+
421+
let mut crc = crc as i32;
422+
if crc <= 0 {
423+
log::debug!("{:?} - negative hash/{} for key/{:?}", self.name, crc, key);
424+
crc = crc.abs();
425+
}
426+
crc as i64
427+
}
428+
}
429+
430+
impl Display for Crc32AbsDelimiter {
431+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432+
write!(f, "{:?}", self.name)
433+
}
434+
}

sharding/src/hash/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub enum Hasher {
8383
LBCrc32localDelimiter(LBCrc32localDelimiter), // long bytes crc32local for hash like: 123.a, 124_a, 123#a
8484
Rawcrc32local(Rawcrc32local), // raw or crc32local
8585
Crc32Abs(Crc32Abs), // crc32abs: 基于i32转换,然后直接取abs;其他走i64提升为正数
86+
Crc32AbsDelimiter(Crc32AbsDelimiter), // crc32abs: 基于i32转换,然后直接取abs,同时支持分隔符,格式:$start+$hashkey+$delimiter$
8687
Crc64(Crc64), // Crc64 算法,对整个key做crc64计算
8788
Fnv1aF64(Fnv1aF64),
8889
Random(RandomHash), // random hash
@@ -158,6 +159,7 @@ impl Hasher {
158159
CRC32_EXT_MIXNUM => Self::Crc32MixNum(Default::default()),
159160
_ => Self::Crc32Delimiter(Crc32Delimiter::from(alg_lower.as_str())),
160161
},
162+
"crc32abs" => Self::Crc32AbsDelimiter(Crc32AbsDelimiter::from(alg_lower.as_str())),
161163
"crc32local" => match alg_parts[1] {
162164
CRC32_EXT_SMARTNUM => Self::Crc32localSmartNum(Default::default()),
163165
_ => Self::Crc32localDelimiter(Crc32localDelimiter::from(alg_lower.as_str())),
@@ -169,6 +171,7 @@ impl Hasher {
169171
}
170172
}
171173
}
174+
172175
#[inline]
173176
pub fn crc32_short() -> Self {
174177
Self::Crc32Short(Default::default())
@@ -182,7 +185,7 @@ fn key_delimiter_name_2u8(_alg: &str, delimiter_name: &str) -> u8 {
182185
KEY_DELIMITER_UNDERSCORE => '_',
183186
KEY_DELIMITER_POUND => '#',
184187
_ => {
185-
log::debug!("found unknown hash alg: {}, use crc32 instead", _alg);
188+
log::warn!("found unknown hash alg: {}, use crc32 instead", _alg);
186189
KEY_DELIMITER_NONE as char
187190
}
188191
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
5658010406.sr
2+
3826690697.sr
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2019081705131.sr
2+
1878713812.cr
3+
6577710803.cr

tests/src/shard_checker.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ fn check_shard_data() {
3636
shard_checker(root_dir, data_file);
3737
}
3838

39+
// Breeze端口号到分片idx的映射(idx=0~7:57439~57446,idx=8~255:58544~58791)
40+
fn port_to_idx(port: u16) -> Option<usize> {
41+
if (57439..=57446).contains(&port) {
42+
Some((port - 57439) as usize)
43+
} else if (58544..=58791).contains(&port) {
44+
Some((port - 58544 + 8) as usize)
45+
} else {
46+
None
47+
}
48+
}
3949
#[test]
4050
fn build_shard_data() {
4151
let shard_conf = ShardConf {
@@ -235,3 +245,62 @@ fn write_shard_data(shard_conf: &ShardConf, src: &str, dest: &str) {
235245
writer.flush().unwrap();
236246
}
237247
}
248+
249+
/// 遍历所有端口key文件,导出key,port到csv,便于后续hash分布校验
250+
#[test]
251+
fn check_readed_redis_hasher() {
252+
use std::fs::{self, File};
253+
use std::io::{BufRead, BufReader};
254+
255+
let dir = "sharding_datas/readed-redis-port-key";
256+
let files: Vec<_> = fs::read_dir(dir)
257+
.expect("read_dir failed")
258+
.filter_map(|e| e.ok())
259+
.filter(|e| e.path().is_file())
260+
.collect();
261+
262+
let hasher = Hasher::from("crc32abs-point");
263+
let mock_servers = mock_servers(256);
264+
let dist = Distribute::from("modula", &mock_servers);
265+
266+
for entry in files {
267+
let path = entry.path();
268+
let fname = path.file_name().unwrap().to_string_lossy();
269+
if !fname.ends_with(".txt") { continue; }
270+
let port: u16 = match fname.trim_end_matches(".txt").parse() {
271+
Ok(p) => p,
272+
Err(_) => continue,
273+
};
274+
let idx = match port_to_idx(port) {
275+
Some(idx) => idx,
276+
None => panic!("端口{}不在合法分片区间", port),
277+
};
278+
let file = File::open(&path).expect("open port file failed");
279+
let reader = BufReader::new(file);
280+
for line in reader.lines() {
281+
let key = line.unwrap();
282+
let key = key.trim();
283+
if key.is_empty() { continue; }
284+
// shards[idx].push(key.to_string());
285+
let hash = hasher.hash(&key.as_bytes());
286+
let shard_idx = dist.index(hash);
287+
if shard_idx != idx {
288+
println!("key={} hash={} shard_idx={} file={} idx={}", key, hash, shard_idx, fname, idx);
289+
}
290+
}
291+
println!("已处理端口文件: {}", path.display());
292+
}
293+
}
294+
295+
#[test]
296+
fn check_crc32abs_hasher() {
297+
let hasher = Hasher::from("crc32abs-point");
298+
let mock_servers = mock_servers(256);
299+
let dist = Distribute::from("modula", &mock_servers);
300+
301+
let key = "6972284375.cr2";
302+
let hash = hasher.hash(&key.as_bytes());
303+
let idx = dist.index(hash);
304+
println!("key: {}, hash: {}, idx: {}", key, hash, idx);
305+
306+
}

0 commit comments

Comments
 (0)