Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ env:
min_key: 1
max_key: 10000
socks_dir: /home/runner/work/breeze/socks
RUSTFLAGS: "-D warnings"
# TODO rust版本升级后,突然大量warnings,暂时允许dead_code警告
RUSTFLAGS: "-D warnings --allow dead_code"

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion discovery/src/dns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl Ipv4Vec {
}
self.len += 1;
}
pub fn iter(&self) -> Ipv4VecIter {
pub fn iter(&self) -> Ipv4VecIter<'_> {
let iter = if self.len() <= self.cache.len() {
self.cache[..self.len as usize].iter()
} else {
Expand Down
7 changes: 7 additions & 0 deletions sharding/src/distribution/modrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
pub struct ModRange {
slot: u64,
interval: u64,
shards: usize,
}

// ModRange 分布方法,默认总范围是[0,256),否则指定slot
Expand All @@ -17,6 +18,7 @@ impl ModRange {
ModRange {
slot,
interval: slot / shards as u64,
shards,
}
}

Expand All @@ -27,6 +29,11 @@ impl ModRange {
val = val.wrapping_abs();
}
let rs = val as u64 / self.interval;
if rs >= self.shards as u64 {
// 超出范围,返回最后一个分片
log::warn!("found modrange slot out of bound, rs:{}, shards:{}", rs, self.shards);
return self.shards - 1;
}
rs as usize
}
}
7 changes: 7 additions & 0 deletions sharding/src/distribution/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::distribution::DIST_RANGE_SLOT_COUNT_DEFAULT;
pub struct Range {
slot: u64,
interval: u64,
shards: usize,
}

// Range 分布方法,默认总范围是[0,256),否则用
Expand All @@ -16,6 +17,7 @@ impl Range {
Range {
slot,
interval: slot / shards as u64,
shards,
}
}

Expand All @@ -28,6 +30,11 @@ impl Range {
val = val.wrapping_abs();
}
let rs = val as u64 / self.interval;
if rs >= self.shards as u64 {
// 超出范围,返回最后一个分片
log::warn!("found range slot out of bound, rs:{}, shards:{}", rs, self.shards);
return self.shards - 1;
}
rs as usize
}
}
86 changes: 81 additions & 5 deletions sharding/src/hash/crc32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::fmt::Display;
/// ================ 基于i32版本 ================
/// 备注:不取名crc32-abs,是为了区分i32/i63的base,及为后续扩展做准备 fishermen
/// 7 crc32abs: 转为i32,然后进行abs操作
/// 8 后续可能会有crc32abs-xxx,此处先预留语义
/// 8 后续可能会有crc32abs-delimiter,基于point/pound/underscore之前的部分key,先转为i32,然后进行abs操作
///

pub(super) const CRC32TAB: [i64; 256] = [
Expand Down Expand Up @@ -82,6 +82,18 @@ pub struct Crc32SmartNum {}
#[derive(Default, Clone, Debug)]
pub struct Crc32MixNum {}

/// 遵从i32进行crc32计算,并进行abs操作;
#[derive(Debug, Clone, Default)]
pub struct Crc32Abs;

/// 遵从i32进行crc32计算,并进行abs操作,且只计算startpos之后、delimiter之前的字符,格式:$start+$hashkey+$delimiter$
#[derive(Default, Clone, Debug)]
pub struct Crc32AbsDelimiter {
start_pos: usize,
delimiter: u8,
name: DebugName,
}

// 对全key做crc32
impl super::Hash for Crc32 {
#[inline]
Expand Down Expand Up @@ -331,10 +343,6 @@ impl super::Hash for Crc32MixNum {
}
}

/// 遵从i32进行crc32计算,并进行abs操作;
#[derive(Debug, Clone, Default)]
pub struct Crc32Abs;

impl Hash for Crc32Abs {
fn hash<S: super::HashKey>(&self, key: &S) -> i64 {
let mut crc: i64 = CRC_SEED;
Expand All @@ -356,3 +364,71 @@ impl Hash for Crc32Abs {
crc as i64
}
}

/// --- Crc32AbsDelimiter impl: crc32abs-point/underscore/pound ---
impl Crc32AbsDelimiter {
pub fn from(alg: &str) -> Self {
let alg_parts: Vec<&str> = alg.split(super::HASHER_NAME_DELIMITER).collect();

debug_assert!(alg_parts.len() >= 2);
debug_assert_eq!(alg_parts[0], "crc32abs");

let delimiter = super::key_delimiter_name_2u8(alg, alg_parts[1]);

if alg_parts.len() == 2 {
return Self {
start_pos: 0,
delimiter,
name: alg.into(),
};
}

debug_assert!(alg_parts.len() == 3);
if let Ok(prefix_len) = alg_parts[2].parse::<usize>() {
return Self {
start_pos: prefix_len,
delimiter,
name: alg.into(),
};
} else {
log::debug!("found unknown hash/{}, ignore prefix instead", alg);
return Self {
start_pos: 0,
delimiter,
name: alg.into(),
};
}
}
}

impl super::Hash for Crc32AbsDelimiter {
fn hash<S: super::HashKey>(&self, key: &S) -> i64 {
let mut crc: i64 = CRC_SEED;
debug_assert!(self.start_pos < key.len());

let check_delimiter = self.delimiter != super::KEY_DELIMITER_NONE;
for i in self.start_pos..key.len() {
let c = key.at(i);
if check_delimiter && (c == self.delimiter) {
break;
}
crc = ((crc >> 8) & 0x00FFFFFF) ^ CRC32TAB[((crc ^ (c as i64)) & 0xff) as usize];
}

crc ^= CRC_SEED;
crc &= CRC_SEED;

let mut crc = crc as i32;
if crc <= 0 {
log::debug!("{:?} - negative hash/{} for key/{:?}", self.name, crc, key);
crc = crc.abs();
}
crc as i64
}
}

impl Display for Crc32AbsDelimiter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.name)
}
}
5 changes: 4 additions & 1 deletion sharding/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub enum Hasher {
LBCrc32localDelimiter(LBCrc32localDelimiter), // long bytes crc32local for hash like: 123.a, 124_a, 123#a
Rawcrc32local(Rawcrc32local), // raw or crc32local
Crc32Abs(Crc32Abs), // crc32abs: 基于i32转换,然后直接取abs;其他走i64提升为正数
Crc32AbsDelimiter(Crc32AbsDelimiter), // crc32abs: 基于i32转换,然后直接取abs,同时支持分隔符,格式:$start+$hashkey+$delimiter$
Crc64(Crc64), // Crc64 算法,对整个key做crc64计算
Fnv1aF64(Fnv1aF64),
Random(RandomHash), // random hash
Expand Down Expand Up @@ -158,6 +159,7 @@ impl Hasher {
CRC32_EXT_MIXNUM => Self::Crc32MixNum(Default::default()),
_ => Self::Crc32Delimiter(Crc32Delimiter::from(alg_lower.as_str())),
},
"crc32abs" => Self::Crc32AbsDelimiter(Crc32AbsDelimiter::from(alg_lower.as_str())),
"crc32local" => match alg_parts[1] {
CRC32_EXT_SMARTNUM => Self::Crc32localSmartNum(Default::default()),
_ => Self::Crc32localDelimiter(Crc32localDelimiter::from(alg_lower.as_str())),
Expand All @@ -169,6 +171,7 @@ impl Hasher {
}
}
}

#[inline]
pub fn crc32_short() -> Self {
Self::Crc32Short(Default::default())
Expand All @@ -182,7 +185,7 @@ fn key_delimiter_name_2u8(_alg: &str, delimiter_name: &str) -> u8 {
KEY_DELIMITER_UNDERSCORE => '_',
KEY_DELIMITER_POUND => '#',
_ => {
log::debug!("found unknown hash alg: {}, use crc32 instead", _alg);
log::warn!("found unknown hash alg: {}, use crc32 instead", _alg);
KEY_DELIMITER_NONE as char
}
};
Expand Down
2 changes: 2 additions & 0 deletions tests/sharding_datas/readed-redis-port-key/58634.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
5658010406.sr
3826690697.sr
3 changes: 3 additions & 0 deletions tests/sharding_datas/readed-redis-port-key/58635.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2019081705131.sr
1878713812.cr
6577710803.cr
69 changes: 69 additions & 0 deletions tests/src/shard_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ fn check_shard_data() {
shard_checker(root_dir, data_file);
}

// Breeze端口号到分片idx的映射(idx=0~7:57439~57446,idx=8~255:58544~58791)
fn port_to_idx(port: u16) -> Option<usize> {
if (57439..=57446).contains(&port) {
Some((port - 57439) as usize)
} else if (58544..=58791).contains(&port) {
Some((port - 58544 + 8) as usize)
} else {
None
}
}
#[test]
fn build_shard_data() {
let shard_conf = ShardConf {
Expand Down Expand Up @@ -235,3 +245,62 @@ fn write_shard_data(shard_conf: &ShardConf, src: &str, dest: &str) {
writer.flush().unwrap();
}
}

/// 遍历所有端口key文件,导出key,port到csv,便于后续hash分布校验
#[test]
fn check_readed_redis_hasher() {
use std::fs::{self, File};
use std::io::{BufRead, BufReader};

let dir = "sharding_datas/readed-redis-port-key";
let files: Vec<_> = fs::read_dir(dir)
.expect("read_dir failed")
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.collect();

let hasher = Hasher::from("crc32abs-point");
let mock_servers = mock_servers(256);
let dist = Distribute::from("modula", &mock_servers);

for entry in files {
let path = entry.path();
let fname = path.file_name().unwrap().to_string_lossy();
if !fname.ends_with(".txt") { continue; }
let port: u16 = match fname.trim_end_matches(".txt").parse() {
Ok(p) => p,
Err(_) => continue,
};
let idx = match port_to_idx(port) {
Some(idx) => idx,
None => panic!("端口{}不在合法分片区间", port),
};
let file = File::open(&path).expect("open port file failed");
let reader = BufReader::new(file);
for line in reader.lines() {
let key = line.unwrap();
let key = key.trim();
if key.is_empty() { continue; }
// shards[idx].push(key.to_string());
let hash = hasher.hash(&key.as_bytes());
let shard_idx = dist.index(hash);
if shard_idx != idx {
println!("key={} hash={} shard_idx={} file={} idx={}", key, hash, shard_idx, fname, idx);
}
}
println!("已处理端口文件: {}", path.display());
}
}

#[test]
fn check_crc32abs_hasher() {
let hasher = Hasher::from("crc32abs-point");
let mock_servers = mock_servers(256);
let dist = Distribute::from("modula", &mock_servers);

let key = "6972284375.cr2";
let hash = hasher.hash(&key.as_bytes());
let idx = dist.index(hash);
println!("key: {}, hash: {}, idx: {}", key, hash, idx);

}
Loading