diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e6eb7294e..334d63cf4 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,6 +15,7 @@ env: phantom: localhost:9303 mysql: localhost:3306 vector: localhost:3308 + kv2vector: localhost:3309 mq: localhost:56815 min_key: 1 max_key: 10000 @@ -31,7 +32,7 @@ jobs: docker run --rm --name breeze_ci_mysql4623 -p 4623:3306 -d parabala/mysqlci_with_schema:v0.0.2 docker run --rm --name breeze_ci_mysql4624 -p 4624:3306 -d parabala/mysqlci_with_schema:v0.0.2 - name: Prepare Vintage_MC_Redis - run: docker run -d -v /home/runner/work/breeze:/data1/resource/breeze --net="host" --name breeze_github_ci hustfisher/breeze:githubci115 + run: docker run -d -v /home/runner/work/breeze:/data1/resource/breeze --net="host" --name breeze_github_ci hustfisher/breeze:githubci116 - uses: actions/checkout@v3 - name: Install stable toolchain uses: actions-rs/toolchain@v1 @@ -53,6 +54,7 @@ jobs: curl http://127.0.0.1:8080/config/cloud/kv/testbreeze/kvmeshtest sleep 1 curl http://127.0.0.1:8080/config/cloud/vector/testbreeze/vectortest + curl http://127.0.0.1:8080/config/cloud/vector/testbreeze/kv2vector - name: Create Socks run: | #ps -aux|grep breeze @@ -66,6 +68,7 @@ jobs: touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+phantom+testbreeze+phantomtest@phantom:9303@pt touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+kv+testbreeze+kvmeshtest@kv:3306@kv touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+vectortest@vector:3308@vector + touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+kv2vector@vector:3309@vector touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+mq+testbreeze+mcqmeshtest_1@msgque:56815@mq ls -all /home/runner/work/breeze/snapshot ls -all /home/runner/work/breeze/socks @@ -94,7 +97,7 @@ jobs: run: cargo test -p tests --features github_workflow -p endpoint - name: wait until 2 mins run: | - port_list=(56810 56812 9302 9301 9303 3306 3308) # 端口列表 + port_list=(56810 56812 9302 9301 9303 3306 3308 3309 56815) # 端口列表 start=$(date +%s) # 获取当前时间戳 while true; do diff --git a/ci.sh b/ci.sh index ab214f8b4..54a0e7081 100755 --- a/ci.sh +++ b/ci.sh @@ -11,7 +11,7 @@ container_name=breeze_github_ci docker ps -a | grep "$container_name" && docker rm -f "$container_name" -docker run --rm -d -v $brz_home:/data1/resource/breeze --net="host" --name "$container_name" parabala/breeze:githubci108 +docker run --rm -d -v $brz_home:/data1/resource/breeze --net="host" --name "$container_name" hustfisher/breeze:githubci116 # rm -rf $brz_home/* mkdir -p $brz_home/logs @@ -24,6 +24,7 @@ touch $brz_home/socks/127.0.0.1:8080+config+cloud+counterservice+testbreeze+mesh touch $brz_home/socks/127.0.0.1:8080+config+cloud+phantom+testbreeze+phantomtest@phantom:9303@pt touch $brz_home/socks/127.0.0.1:8080+config+cloud+kv+testbreeze+kvmeshtest@kv:3306@kv touch $brz_home/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+vectortest@vector:3308@vector +touch $brz_home/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+kv2vector@vector:3309@vector cargo build @@ -38,6 +39,7 @@ export mc=localhost:9301 export phantom=localhost:9303 export mysql=localhost:3306 export vector=localhost:3308 +export kv2vector=localhost:3309 export min_key=1 export max_key=10000 export socks_dir=$brz_home/socks @@ -45,7 +47,7 @@ export socks_dir=$brz_home/socks RUST_BACKTRACE=1 cargo test -p tests #等待mesh初始化,最多等待两分钟 -port_list=(56810 56812 9302 9301 9303 3306 3308) # 端口列表 +port_list=(56810 56812 9302 9301 9303 3306 3308 3309) # 端口列表 start=$(date +%s) # 获取当前时间戳 while true; do diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index 04d95a686..61f2d539a 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -3,7 +3,7 @@ use crate::{Endpoint, Endpoints, Topology}; use discovery::TopologyWrite; use protocol::memcache::Binary; use protocol::{Protocol, Request, Resource::Memcache}; -use sharding::hash::{Hash, HashKey, Hasher}; +use sharding::hash::{Hash, HashKey, Hasher, HashGrouper}; use super::config::Flag; use crate::shards::Shards; @@ -74,6 +74,13 @@ where } } +impl HashGrouper for CacheService +where + E: Endpoint, + P: Protocol, +{ +} + impl Topology for CacheService where E: Endpoint, diff --git a/endpoint/src/kv/kvtime.rs b/endpoint/src/kv/kvtime.rs index 4afeae069..c3dfa927d 100644 --- a/endpoint/src/kv/kvtime.rs +++ b/endpoint/src/kv/kvtime.rs @@ -81,6 +81,15 @@ impl KVTime { } } } + #[inline] + pub fn table_postfix(&self) -> Postfix { + return self.table_postfix.clone() + } + #[inline] + pub fn get_date(&self, key: &RingSlice) -> NaiveDate { + let uuid = key.uuid(); + uuid.native_date() + } } impl Strategy for KVTime { fn distribution(&self) -> &DBRange { diff --git a/endpoint/src/kv/topo.rs b/endpoint/src/kv/topo.rs index 50ef7da7e..0d27cb3ea 100644 --- a/endpoint/src/kv/topo.rs +++ b/endpoint/src/kv/topo.rs @@ -14,7 +14,7 @@ use protocol::Request; use protocol::ResOption; use protocol::Resource; use rand::seq::SliceRandom; -use sharding::hash::{Hash, HashKey}; +use sharding::hash::{Hash, HashKey, HashGrouper}; use crate::dns::DnsConfig; use crate::Timeout; @@ -57,6 +57,13 @@ where } } +impl HashGrouper for KvService +where + E: Endpoint, + P: Protocol, +{ +} + impl Topology for KvService where E: Endpoint, diff --git a/endpoint/src/kv/uuid.rs b/endpoint/src/kv/uuid.rs index 550f5ea03..f639ccb2d 100644 --- a/endpoint/src/kv/uuid.rs +++ b/endpoint/src/kv/uuid.rs @@ -1,4 +1,4 @@ -use chrono::{Datelike, TimeZone}; +use chrono::{Datelike, TimeZone, NaiveDate}; use chrono_tz::Asia::Shanghai; // use std::collections::HashMap; // use std::sync::atomic::{AtomicI64, Ordering}; @@ -29,6 +29,8 @@ pub trait Uuid { fn ymd(&self) -> (u16, u8, u8); // 返回year fn year(&self) -> u16; + // 返回date + fn native_date(&self) -> NaiveDate; } impl Uuid for i64 { @@ -53,6 +55,14 @@ impl Uuid for i64 { let (year, _, _) = self.ymd(); year } + #[inline(always)] + fn native_date(&self) -> NaiveDate { + chrono::Utc + .timestamp_opt(self.unix_secs(), 0) + .unwrap() + .with_timezone(&Shanghai) + .date_naive() + } } pub trait UuidGet { diff --git a/endpoint/src/msgque/topo.rs b/endpoint/src/msgque/topo.rs index 3ad366a77..996b69017 100644 --- a/endpoint/src/msgque/topo.rs +++ b/endpoint/src/msgque/topo.rs @@ -18,7 +18,7 @@ use super::{ use crate::dns::{DnsConfig, DnsLookup}; use crate::msgque::SizedQueueInfo; use crate::{CloneAbleAtomicBool, Endpoint, Endpoints, Timeout, Topology}; -use sharding::hash::{Hash, HashKey, Hasher, Padding}; +use sharding::hash::{Hash, HashKey, Hasher, Padding, HashGrouper}; // ip vintage下线后,2分钟后真正从读列表中下线,写列表是立即下线的 const OFFLINE_LIMIT_SECONDS: u64 = 60 * 2; @@ -93,6 +93,13 @@ where } } +impl HashGrouper for MsgQue +where + E: Endpoint, + P: Protocol, +{ +} + impl Topology for MsgQue where E: Endpoint, diff --git a/endpoint/src/phantomservice/topo.rs b/endpoint/src/phantomservice/topo.rs index 2718e05fd..31f319803 100644 --- a/endpoint/src/phantomservice/topo.rs +++ b/endpoint/src/phantomservice/topo.rs @@ -7,7 +7,7 @@ use discovery::{Inited, TopologyWrite}; use protocol::{Protocol, Request, Resource::Phantom}; use sharding::{ distribution::Range, - hash::{Crc32, Hash, HashKey}, + hash::{Crc32, Hash, HashKey, HashGrouper}, }; use super::config::PhantomNamespace; @@ -45,6 +45,13 @@ where } } +impl HashGrouper for PhantomService +where + E: Endpoint, + P: Protocol, +{ +} + impl Topology for PhantomService where E: Endpoint, diff --git a/endpoint/src/redisservice/topo.rs b/endpoint/src/redisservice/topo.rs index 4dff8d163..be2f1089b 100644 --- a/endpoint/src/redisservice/topo.rs +++ b/endpoint/src/redisservice/topo.rs @@ -6,7 +6,7 @@ use crate::{ use discovery::TopologyWrite; use protocol::{Protocol, RedisFlager, Request, Resource::Redis}; use sharding::distribution::Distribute; -use sharding::hash::{Hash, HashKey, Hasher}; +use sharding::hash::{Hash, HashKey, Hasher, HashGrouper}; use super::config::RedisNamespace; @@ -43,6 +43,13 @@ where } } +impl HashGrouper for RedisService +where + E: Endpoint, + P: Protocol, +{ +} + impl Topology for RedisService where E: Endpoint, diff --git a/endpoint/src/topo.rs b/endpoint/src/topo.rs index 07d81da95..dc28aef7c 100644 --- a/endpoint/src/topo.rs +++ b/endpoint/src/topo.rs @@ -1,6 +1,7 @@ use discovery::{Inited, TopologyWrite}; use protocol::{Protocol, Request, ResOption, Resource}; -use sharding::hash::{Hash, HashKey}; +use sharding::hash::{Hash, HashKey, HashGrouper}; +use ds::RingSlice; use crate::Timeout; @@ -34,7 +35,7 @@ procs::topology_dispatcher! { fn build(addr: &str, p: P, r: Resource, service: &str, to: Timeout) -> Self {Self::build_o(addr, p, r, service, to, Default::default())} } => where P:Protocol, E:Endpoint + Inited, R: Request - pub trait Topology : Endpoint + Hash{ + pub trait Topology : Endpoint + Hash + HashGrouper { fn exp_sec(&self) -> u32 {86400} fn has_attach(&self) -> bool {false} } => where P:Protocol, E:Endpoint, R:Request, Topologies: Endpoint @@ -50,6 +51,10 @@ procs::topology_dispatcher! { fn load(&mut self) -> bool; } => where P:Protocol, E:Endpoint + trait HashGrouper { + fn group(&self, _key: &RingSlice) -> Option, i64)>> { None } + } => where P:Protocol, E:Endpoint + trait Hash { fn hash(&self, key: &S) -> i64; } => where P:Protocol, E:Endpoint, diff --git a/endpoint/src/uuid/topo.rs b/endpoint/src/uuid/topo.rs index 016e7d139..d9e6b2f76 100644 --- a/endpoint/src/uuid/topo.rs +++ b/endpoint/src/uuid/topo.rs @@ -5,7 +5,7 @@ use crate::{ }; use discovery::TopologyWrite; use protocol::{Protocol, Request, Resource::Uuid}; -use sharding::hash::{Hash, HashKey}; +use sharding::hash::{Hash, HashKey, HashGrouper}; use super::config::UuidNamespace; @@ -37,6 +37,13 @@ where } } +impl HashGrouper for UuidService +where + E: Endpoint, + P: Protocol, +{ +} + impl Topology for UuidService where E: Endpoint, diff --git a/endpoint/src/vector/batch.rs b/endpoint/src/vector/batch.rs index d27afdb4a..5889033eb 100644 --- a/endpoint/src/vector/batch.rs +++ b/endpoint/src/vector/batch.rs @@ -195,6 +195,10 @@ impl Aggregation { CommandType::Unknown => panic!("not sup {:?}", vcmd.cmd), } } + #[inline] + pub(crate) fn table_postfix(&self) -> Postfix { + self.table_postfix.clone() + } } #[derive(Clone, Debug)] diff --git a/endpoint/src/vector/strategy.rs b/endpoint/src/vector/strategy.rs index 6ccb8756e..878f6aac2 100644 --- a/endpoint/src/vector/strategy.rs +++ b/endpoint/src/vector/strategy.rs @@ -63,6 +63,17 @@ impl Strategist { ns.si_backends.len() as u32, )) } + "kvtime" => { + Self::VectorTime(VectorTime::new_with_db( + ns.basic.db_name.clone(), + ns.basic.table_name.clone(), + ns.basic.db_count, + //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 + ns.backends.iter().next().unwrap().1.len() as u32, + ns.basic.table_postfix.as_str().try_into().ok()?, + ns.basic.keys.clone(), + )) + } _ => { if ns.basic.keys.len() < 2 { log::warn!("len keys < 2"); @@ -119,12 +130,24 @@ impl Strategist { } } -pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<()> { + pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<()> { match self { Strategist::VectorTime(inner) => inner.check_vector_cmd(vcmd), Strategist::Aggregation(inner) => inner.check_vector_cmd(vcmd), } } + pub(crate) fn keys(&self) -> &[String] { + match self { + Strategist::VectorTime(inner) => inner.keys(), + Strategist::Aggregation(inner) => inner.keys(), + } + } + pub(crate) fn table_postfix(&self) -> Postfix { + match self { + Strategist::VectorTime(inner) => inner.table_postfix(), + Strategist::Aggregation(inner) => inner.table_postfix(), + } + } // /// 获得配置的默认route;当配置strategy为aggregation时,默认的route是Aggregation,否则就是Main // #[inline] diff --git a/endpoint/src/vector/topo.rs b/endpoint/src/vector/topo.rs index b8182f4c4..e7e494a01 100644 --- a/endpoint/src/vector/topo.rs +++ b/endpoint/src/vector/topo.rs @@ -5,16 +5,16 @@ use chrono::{Datelike, NaiveDate}; use discovery::dns; use discovery::dns::IPPort; use discovery::TopologyWrite; -use ds::MemGuard; +use ds::{MemGuard, RingSlice}; use protocol::kv::{ContextStatus, MysqlBuilder}; use protocol::vector::attachment::{BackendType, VAttach, VectorAttach}; -use protocol::vector::redis::parse_vector_detail; -use protocol::vector::{command, CommandType, VectorCmd}; +use protocol::vector::redis::{parse_vector_detail, KVECTOR_SEPARATOR}; +use protocol::vector::{command, CommandType, Postfix, VectorCmd}; use protocol::Request; use protocol::ResOption; use protocol::Resource; use protocol::{Operation, Protocol}; -use sharding::hash::{Hash, HashKey}; +use sharding::hash::{Hash, HashKey, HashGrouper}; use crate::dns::DnsConfig; use crate::Timeout; @@ -59,6 +59,98 @@ where } } +impl HashGrouper for VectorService +where + E: Endpoint, + P: Protocol, +{ + // 单key请求返回None + #[inline] + fn group(&self, k: &RingSlice) -> Option, i64)>> { + let mut keys: Vec = Vec::with_capacity(8); + let mut oft:usize = 0; + loop { + if oft >= k.len() { + break; + } + // 从keys中split出','分割的key + let idx = k + .find(oft, KVECTOR_SEPARATOR) + .unwrap_or(k.len()); + keys.push(k.sub_slice(oft, idx - oft)); + oft = idx + 1; + } + + let keys_name = self.strategist.keys(); + let step = keys_name.len(); + if step == 0 || keys.len() <= step || keys.len() % step != 0 { + return None; + } + // 接下来从key解析出时间信息 + let cmdtype = CommandType::VGet; // 满足get_date函数第一个参数 + let mut group_map: HashMap<(usize, u16, u8, u8), Vec<(usize, i64)>> = HashMap::new(); + for i in (0..keys.len()).step_by(step) { + let k = &keys[i..i + step]; + match self.strategist.get_date(cmdtype, k) { + Ok(date) => { + let y = date.year() as u16; + let m = date.month() as u8; + let d = match &self.strategist.table_postfix() { + Postfix::YYMMDD => date.day() as u8, + _ => 0, + }; + let hash = self.strategist.hasher().hash(&k[0]); + let shard_idx = self.strategist.distribution().index(hash); + + // Store the index of each key in the groups vector to the hashmap + group_map.entry((shard_idx, y, m, d)) + .or_insert_with(Vec::new) + .push((i,hash)); + log::debug!("key grouped: shard_idx {} y {} m {} d {} key {} hash {}", shard_idx, y, m, d, keys[i], hash); + } + Err(_) => return None + } + } + log::debug!("key grouped index: {:?} ", group_map); + + // step==1: key每组1个元素,key包含时间信息 + // step>2: key每组step个元素,倒数第一个key就是时间信息 + let mut groups: Vec<(Vec, i64)> = Vec::with_capacity(16); + for (_, indices) in group_map { + let mut concatenated_key: Vec = Vec::with_capacity(256); + let mut hash :i64 = 0; + if step == 1 { + // If step is 1, concatenate keys at the given indices with ',' + for i in indices { + keys[i.0].copy_to_vec(&mut concatenated_key); + concatenated_key.push(b','); + hash = i.1; + } + if !concatenated_key.is_empty() { + // Remove the trailing comma + concatenated_key.pop(); + groups.push((concatenated_key, hash)); + } + } else { + // If step is greater than 1, concatenate keys in the specified range and format + let mut last: usize = 0; + for i in indices { + last = i.0; + for j in [last, last + step - 1] { + keys[j].copy_to_vec(&mut concatenated_key); + concatenated_key.push(b','); + } + hash = i.1; + } + keys[last + step - 1].copy_to_vec(&mut concatenated_key); + // concatenated_key.push_str(keys[last + step - 1].to_string().as_str()); + groups.push((concatenated_key, hash)); + } + } + Some(groups) + } +} + impl Topology for VectorService where E: Endpoint, diff --git a/endpoint/src/vector/vectortime.rs b/endpoint/src/vector/vectortime.rs index 95766a641..7e6844477 100644 --- a/endpoint/src/vector/vectortime.rs +++ b/endpoint/src/vector/vectortime.rs @@ -1,5 +1,5 @@ use crate::kv::kvtime::KVTime; - +use crate::kv::uuid::*; use chrono::NaiveDate; use core::fmt::Write; use ds::RingSlice; @@ -40,6 +40,12 @@ impl VectorTime { //策略处已作校验 pub fn get_date(&self, keys: &[RingSlice]) -> Result { let date = keys.last().unwrap(); + // keys_name长度为1,则key本身是uuid + if self.keys_name.len() == 1 { + let k = &keys[0]; + let uuid = k.uuid(); + return Ok(uuid.native_date()) + } let ymd = match self.keys_name.last().unwrap().as_str() { DATE_YYMM => ( date.try_str_num(0..0 + 2) @@ -99,7 +105,7 @@ impl VectorTime { } pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<(), Error> { - if vcmd.keys.len() != self.keys_name.len() { + if vcmd.keys.len() < self.keys_name.len() { return Err(Error::RequestProtocolInvalid); } Ok(()) diff --git a/protocol/src/flag.rs b/protocol/src/flag.rs index 4c8accf64..dfe110a5d 100644 --- a/protocol/src/flag.rs +++ b/protocol/src/flag.rs @@ -1,6 +1,6 @@ use crate::{HashedCommand, OpCode, Operation}; pub type FlagExt = u64; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct Flag { op_code: OpCode, op: Operation, diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index ab3667f51..f79441200 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -1,6 +1,6 @@ use enum_dispatch::enum_dispatch; -use sharding::hash::Hash; +use sharding::hash::{Hash, HashGrouper}; use crate::kv::Kv; use crate::memcache::MemcacheBinary; @@ -77,7 +77,7 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { fn handshake(&self, stream: &mut impl Stream, option: &mut ResOption) -> Result { Ok(HandShake::Success) } - fn parse_request( + fn parse_request( &self, stream: &mut S, alg: &H, diff --git a/protocol/src/vector/command.rs b/protocol/src/vector/command.rs index c7f826a99..0599dae4d 100644 --- a/protocol/src/vector/command.rs +++ b/protocol/src/vector/command.rs @@ -1,5 +1,5 @@ -use crate::{redis::command::CommandHasher, OpCode, Operation, Result}; - +use crate::{redis::command::CommandHasher, vector::flager::KvFlager, Flag, HashedCommand, OpCode, Operation, Result}; +use ds::{Ext, MemGuard}; // 指令参数需要配合实际请求的token数进行调整,所以外部使用都通过方法获取 #[derive(Default, Debug)] pub struct CommandProperties { @@ -12,6 +12,7 @@ pub struct CommandProperties { pub(crate) op: Operation, pub(crate) supported: bool, pub(crate) has_key: bool, // 是否有key + pub(crate) multi: bool, // 该命令是否可能会包含多个key pub(crate) can_hold_field: bool, //能否持有field pub(crate) can_hold_where_condition: bool, // 能否持有where condition // 指令在不路由或者无server响应时的响应位置, @@ -52,11 +53,59 @@ impl CommandProperties { Err(KvectorError::ReqInvalidBulkNum.into()) } - pub(crate) fn flag(&self) -> crate::Flag { - let mut flag = crate::Flag::from_op(self.op_code, self.op); + pub(crate) fn flag(&self) -> Flag { + let mut flag = Flag::from_op(self.op_code, self.op); flag.set_noforward(self.noforward); flag } + // 重建request:只需replace key即可 + #[inline] + pub(super) fn build_request( + &self, + hash: i64, + first: bool, + mut flag: Flag, + key: &Vec, + data: &MemGuard, + ) -> HashedCommand { + use ds::Buffer; + let mut cmd = Vec::with_capacity(data.len()); + let org_key_pos = flag.key_pos(); + cmd.write_slice(&data.sub_slice(0, org_key_pos as usize)); + // 写入key + cmd.push(b'$'); + cmd.write(key.len().to_string()); + cmd.write("\r\n"); + cmd.write(key); + cmd.write("\r\n"); + + // 修改flag; key_pos不变 + let org_field_pos = flag.field_pos(); + let org_condition_pos = flag.condition_pos(); + *flag.ext_mut() = 0; // reset pos + flag.set_key_pos(org_key_pos); + if first { + flag.set_mkey_first(); + } + let other_start = if org_field_pos > 0 { + org_field_pos as u32 + } else { + org_condition_pos + }; + if other_start > 0 { + let oft = other_start - cmd.len() as u32; + cmd.write_slice(&data.sub_slice(other_start as usize, data.len() - other_start as usize)); + if org_field_pos > 0 { + flag.set_field_pos(org_field_pos - oft as u8); + } + if org_condition_pos > 0 { + flag.set_condition_pos(org_condition_pos - oft); + } + } + + let cmd: MemGuard = MemGuard::from_vec(cmd); + HashedCommand::new(cmd, hash, flag) + } } pub(super) struct Commands { @@ -144,15 +193,15 @@ pub(super) static SUPPORTED: Commands = { Cmd::new("quit").arity(1).op(Meta).padding(pt[1]).nofwd().quit(), // kvector 相关的指令 - Cmd::new("vrange").arity(-2).op(Get).cmd_type(CommandType::VRange).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + Cmd::new("vrange").arity(-2).op(Get).cmd_type(CommandType::VRange).padding(pt[3]).has_key().multi().can_hold_field().can_hold_where_condition(), Cmd::new("vadd").arity(-2).op(Store).cmd_type(CommandType::VAdd).padding(pt[3]).has_key().can_hold_field(), // Cmd::new("vreplace").arity(-2).op(Store).cmd_type(CommandType::VReplace).padding(pt[3]).has_key().can_hold_field(), Cmd::new("vupdate").arity(-2).op(Store).cmd_type(CommandType::VUpdate).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), - Cmd::new("vdel").arity(-2).op(Store).cmd_type(CommandType::VDel).padding(pt[3]).has_key().can_hold_where_condition(), + Cmd::new("vdel").arity(-2).op(Store).cmd_type(CommandType::VDel).padding(pt[3]).has_key().multi().can_hold_where_condition(), Cmd::new("vcard").route(Route::Si).arity(-2).op(Get).cmd_type(CommandType::VCard).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), // vget 只是从timeline获取单条记录,所以route需要设置为timeline/main - Cmd::new("vget").route(Route::TimelineOrMain).arity(-2).op(Get).cmd_type(CommandType::VGet).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + Cmd::new("vget").route(Route::TimelineOrMain).arity(-2).op(Get).cmd_type(CommandType::VGet).padding(pt[3]).has_key().multi().can_hold_field().can_hold_where_condition(), // 对于timeline、si后缀指令,只是中间状态,为了处理方便,不额外增加字段,仍然作为独立指令来处理 Cmd::new("vrange.timeline").route(Route::TimelineOrMain).arity(-2).op(Get).cmd_type(CommandType::VRangeTimeline).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), @@ -204,6 +253,10 @@ impl CommandProperties { self.has_key = true; self } + pub(crate) fn multi(mut self) -> Self { + self.multi = true; + self + } pub(crate) fn can_hold_field(mut self) -> Self { self.can_hold_field = true; self diff --git a/protocol/src/vector/flager.rs b/protocol/src/vector/flager.rs index f3597020e..6f87e8614 100644 --- a/protocol/src/vector/flager.rs +++ b/protocol/src/vector/flager.rs @@ -22,7 +22,10 @@ const CONDITION_POS_SHIFT: u8 = FIELD_POS_SHIFT + FIELD_POS_BITS; pub(super) const CONDITION_POS_BITS: u8 = 24; const CONDITION_POS_MASK: u64 = (1 << CONDITION_POS_BITS) - 1; -/// bits:40-63 暂时保留 +// [40]: 标识是否是第一个key +const MKEY_FIRST_SHIFT: u8 = CONDITION_POS_SHIFT + CONDITION_POS_BITS; +// const MKEY_FIRST_BIT: u8 = 1; // 这个先保留,后续增加字段时需要 +/// bits:41-63 暂时保留 /// 备注:当前kv最多支持的指令名称长度不超过200,field pair不超过200对,condition数量不超过200段; /// 如果field pos、condition pos超过u16,返回u16::max,具体位置需要调用方再次扫描获取; @@ -37,6 +40,8 @@ pub trait KvFlager { // fn condition_count(&self) -> u8; fn set_condition_pos(&mut self, condition_pos: u32); fn condition_pos(&self) -> u32; + fn set_mkey_first(&mut self); + fn mkey_first(&self) -> bool; } use crate::{Bit, Ext}; @@ -94,4 +99,15 @@ impl KvFlager for T { fn condition_pos(&self) -> u32 { self.mask_get(CONDITION_POS_SHIFT, CONDITION_POS_MASK) as u32 } + #[inline] + fn set_mkey_first(&mut self) { + debug_assert!(!self.mkey_first()); + self.set(MKEY_FIRST_SHIFT); + //*self.ext_mut() |= 1 << MKEY_FIRST_SHIFT; + debug_assert!(self.mkey_first()); + } + #[inline] + fn mkey_first(&self) -> bool { + self.get(MKEY_FIRST_SHIFT) + } } diff --git a/protocol/src/vector/mod.rs b/protocol/src/vector/mod.rs index d8cc73bec..cb699e59a 100644 --- a/protocol/src/vector/mod.rs +++ b/protocol/src/vector/mod.rs @@ -18,7 +18,7 @@ use crate::{ use attachment::{AttachType, Route}; use chrono::NaiveDate; use ds::RingSlice; -use sharding::hash::Hash; +use sharding::hash::{HashGrouper, Hash}; use self::attachment::VectorAttach; use self::packet::RedisPack; @@ -28,12 +28,13 @@ use crate::kv::client::Client; use crate::kv::{ContextStatus, HandShakeStatus}; use crate::HandShake; pub use command::CommandType; +pub use flager::KvFlager; #[derive(Clone, Default)] pub struct Vector; impl Protocol for Vector { - fn parse_request( + fn parse_request( &self, stream: &mut S, alg: &H, @@ -253,7 +254,7 @@ impl Protocol for Vector { impl Vector { #[inline] - fn parse_request_inner( + fn parse_request_inner( &self, packet: &mut RequestPacket, alg: &H, @@ -266,10 +267,37 @@ impl Vector { let mut flag = cfg.flag(); let key = packet.parse_cmd(cfg, &mut flag)?; + if cfg.multi && key.is_some() { + // 这里可能有多个key,单key请求按照原来的逻辑处理 + // 属于同一个table的多个key,可以组成多key请求,即:select ... in(key1,...,keyN) + // 多key请求根据keys格式和策略分割key、并分组:(shard_idx, year, month, day)四元组一致,分为一组 + // 然后按照分组,顺序构建请求,并发送; + // TODO:当前只支持一组key,后续支持多组key + if let Some(sorted_keys) = alg.group(&key.unwrap()) { + if sorted_keys.len() > 1 { + log::debug!("+++ kvector temporary req error: only support keys in one table now"); + return Err(Error::RequestProtocolInvalid) + } + let org_cmd = packet.take(); // 原始请求 + for i in 0..sorted_keys.len() { + let (sorted_key, h) = &sorted_keys[i]; + // 构建cmd,准备后续处理 + let cmd = cfg.build_request(*h, i==0, flag.clone(), sorted_key, &org_cmd); + log::debug!("+++ kvector/{} req:{:?}", cfg.name, cmd); + let last = i==(sorted_keys.len()-1); + process.process(cmd, last); + } + return Ok(()) + } + } + let main_key = match key { + Some(key) => Some(packet.main_key(&key)), + None => None, + }; // 构建cmd,准备后续处理 let cmd = packet.take(); - let hash = packet.hash(key, alg); - log::debug!("+++ kvector/{} key:{:?}/{}", cfg.name, key, hash); + let hash = packet.hash(main_key, alg); + log::debug!("+++ kvector/{} main_key:{:?}/{}", cfg.name, main_key, hash); let cmd = HashedCommand::new(cmd, hash, flag); process.process(cmd, true); } diff --git a/protocol/src/vector/mysql.rs b/protocol/src/vector/mysql.rs index 016f08418..edf3725ca 100644 --- a/protocol/src/vector/mysql.rs +++ b/protocol/src/vector/mysql.rs @@ -175,10 +175,12 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { extra, ) = self; - // 对于vupdate、vdel,必须得有where语句 + // 对于kv的vdel、vupdate不需要where语句 + // 对于si/timeline的vdel、vupdate,必须得有where语句 + // TODO: 对于聚合访问的vdel、vupdate需要where语句 match cmd { - CommandType::VUpdate - | CommandType::VDel + CommandType::VUpdateSi + | CommandType::VDelSi | CommandType::VUpdateTimeline | CommandType::VDelTimeline => { if wheres.len() == 0 { @@ -189,12 +191,31 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { _ => {} } + // 有多个key,则使用 ...in + let mkey_num = if keys.len() > strategy.keys().len() { + keys.len() - strategy.keys().len() + } else { + 0 + }; for (i, key) in (&mut strategy.keys_with_type()).enumerate() { if let KeysType::Keys(key) = key { if i == 0 { - let _ = write!(f, "`{}`={}", key, Val(&keys[i])); + if mkey_num == 0 { + let _ = write!(f, "`{}`={}", key, Val(&keys[i])); + } else { + debug_assert!(*cmd == CommandType::VDel || *cmd == CommandType::VGet || *cmd == CommandType::VRange); + for j in 0..(mkey_num+1) { + if j == 0 { + let _ = write!(f, "`{}` in ({}", key, Val(&keys[i+j])); + } else if j == mkey_num { + let _ = write!(f, ",{})", Val(&keys[i+j])); + } else { + let _ = write!(f, ",{}", Val(&keys[i+j])); + } + } + } } else { - let _ = write!(f, " and `{}`={}", key, Val(&keys[i])); + let _ = write!(f, " and `{}`={}", key, Val(&keys[i+mkey_num])); } } } diff --git a/protocol/src/vector/redis.rs b/protocol/src/vector/redis.rs index 37b7d943e..fdd433279 100644 --- a/protocol/src/vector/redis.rs +++ b/protocol/src/vector/redis.rs @@ -4,7 +4,7 @@ use crate::{Flag, Packet, Result}; use ds::RingSlice; pub(crate) const FIELD_BYTES: &'static [u8] = b"FIELD"; -pub(crate) const KVECTOR_SEPARATOR: u8 = b','; +pub const KVECTOR_SEPARATOR: u8 = b','; /// 根据parse的结果,此处进一步获得kvector的detail/具体字段信息,以便进行sql构建 pub fn parse_vector_detail( diff --git a/protocol/src/vector/reqpacket.rs b/protocol/src/vector/reqpacket.rs index a8e05dd36..b11b3b2b0 100644 --- a/protocol/src/vector/reqpacket.rs +++ b/protocol/src/vector/reqpacket.rs @@ -104,7 +104,8 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { let key = self.parse_key(flag)?; if self.bulks == 0 { - return Ok(Some(self.main_key(&key))); + // return Ok(Some(self.main_key(&key))); + return Ok(Some(key)); } // 如果有field,则接下来解析fields,不管是否有field,统一先把where这个token消费掉,方便后续统一从condition位置解析 @@ -132,7 +133,8 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { log::debug!("++++ after condition parsed oft:{}", self.oft); // 返回main-key,不带sub-key、ext-key - Ok(Some(self.main_key(&key))) + // Ok(Some(self.main_key(&key))) + Ok(Some(key)) } #[inline] @@ -164,7 +166,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { /// 获取主key,不带sub-key、ext-key #[inline] - fn main_key(&self, key: &RingSlice) -> RingSlice { + pub(crate) fn main_key(&self, key: &RingSlice) -> RingSlice { let main_key_len = key.find(0, KEY_SEPERATOR).map_or(key.len(), |len| len); key.sub_slice(0, main_key_len) } @@ -287,6 +289,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { #[inline] pub(super) fn take(&mut self) -> ds::MemGuard { assert!(self.oft_last < self.oft, "packet:{}", self); + log::debug!("take: oft_last{} oft:{}, packet:{}",self.oft_last, self.oft, self); let data = self.data.sub_slice(self.oft_last, self.oft - self.oft_last); self.oft_last = self.oft; // 重置bulks和cmd_type diff --git a/sharding/src/hash/mod.rs b/sharding/src/hash/mod.rs index a8214ba43..b4a971bd2 100644 --- a/sharding/src/hash/mod.rs +++ b/sharding/src/hash/mod.rs @@ -22,6 +22,7 @@ pub use random::RandomHash; pub use raw::Raw; pub use rawcrc32local::Rawcrc32local; pub use rawsuffix::RawSuffix; +use ds::RingSlice; pub mod crc; @@ -57,6 +58,12 @@ const KEY_DELIMITER_NONE: u8 = 0; const NAME_RAWSUFFIX: &str = "rawsuffix"; +// 将输入的key列表,按各自的hash规则分组;每组可以发往同一个后端 +// 如redis mget、mysql select in(...)场景 +pub trait HashGrouper { + fn group(&self, _key: &RingSlice) -> Option, i64)>> { None } +} + #[enum_dispatch] pub trait Hash { // hash 可能返回负数 diff --git a/stream/src/topology.rs b/stream/src/topology.rs index a185c2125..09fa49777 100644 --- a/stream/src/topology.rs +++ b/stream/src/topology.rs @@ -1,11 +1,11 @@ use discovery::TopologyReadGuard; -use ds::ReadGuard; +use ds::{ReadGuard, RingSlice}; use endpoint::{Endpoint, Topology}; use protocol::{ callback::{Callback, CallbackPtr}, request::Request, }; -use sharding::hash::{Hash, HashKey}; +use sharding::hash::{Hash, HashKey, HashGrouper}; pub trait TopologyCheck: Sized { fn refresh(&mut self) -> bool; @@ -78,6 +78,13 @@ impl Hash for CheckedTopology { } } +impl HashGrouper for CheckedTopology { + #[inline(always)] + fn group(&self,key: &RingSlice) -> Option, i64)>> { + self.top.group(key) + } +} + impl Topology for CheckedTopology { #[inline(always)] fn exp_sec(&self) -> u32 { diff --git a/tests_integration/src/mysql.rs b/tests_integration/src/mysql.rs index 6b9ac439e..a709e440f 100644 --- a/tests_integration/src/mysql.rs +++ b/tests_integration/src/mysql.rs @@ -7,6 +7,9 @@ const ERR_INVALID_ARG: &str = "Invalid arguments provided"; // NotStored 异常对应的响应,code 5 对应 RespStatus::NotStored const ERR_NOT_STORED: &str = "Unknown error occurred with code: 5"; +//ci环境的mysql默认的 max_allowed_packet为4194304,content colume 长度为8KB,但针对2<<16-1的MAX_PAYLOAD_LEN也测试过 +pub const MAX_PAYLOAD_LEN: usize = 8 * 1024; + //val中有非assic字符和需要mysql转义的字符 #[test] #[rustfmt::skip] @@ -53,8 +56,6 @@ fn delete() { assert_eq!(None, client.get::(key).unwrap()); } -//ci环境的mysql默认的 max_allowed_packet为4194304,content colume 长度为8KB,但针对2<<16-1的MAX_PAYLOAD_LEN也测试过 -const MAX_PAYLOAD_LEN: usize = 8 * 1024; //构建一个sql长度为MAX_PAYLOAD_LEN的packet #[test] fn set_huge_payload() { diff --git a/tests_integration/src/vector/kvtime.rs b/tests_integration/src/vector/kvtime.rs new file mode 100644 index 000000000..078e41631 --- /dev/null +++ b/tests_integration/src/vector/kvtime.rs @@ -0,0 +1,267 @@ +use redis::{RedisError, Value}; +use crate::redis_helper::*; +use super::super::mysql::MAX_PAYLOAD_LEN; + +// Invalid arg异常参数对应的响应 +const ERR_INVALID_REQ: &str = "invalid request"; +const VALUE_FIELD_NAME: &str = "content"; +const KEY_FIELD_NAME: &str = "id"; + +fn setget(key: &str, val: &str) -> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let rsp: Value = redis::cmd("vadd") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg(val) + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(1)); + + let rsp: Value = redis::cmd("vrange") + .arg(key) + .query(&mut conn)?; + assert_eq!(rsp, redis::Value::Bulk(vec![ + Value::Bulk(vec![ + Value::Data(KEY_FIELD_NAME.into()), + redis::Value::Data(VALUE_FIELD_NAME.into()), + ]), + redis::Value::Bulk(vec![ + redis::Value::Data(key.into()), + redis::Value::Data(val.into()), + ]), + ])); + + Ok(()) +} +#[test] +#[rustfmt::skip] +fn set() -> Result<(), RedisError> { + let key = "4892225613598165"; + let val = "4892225613598165_val"; + setget(key, val) +} + +#[test] +fn update() -> Result<(), RedisError> { + let key = "4892225613598154"; + let val = "4892225613598154_val"; + let _ = setget(key, val)?; + let val = "4892225613598154_val2"; + let mut conn = get_conn("kv2vector"); + let rsp: Value = redis::cmd("vupdate") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg(val) + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(1)); + + let rsp: Value = redis::cmd("get") + .arg(key) + .query(&mut conn)?; + assert_eq!(rsp, redis::Value::Bulk(vec![ + Value::Bulk(vec![ + Value::Data(KEY_FIELD_NAME.into()), + redis::Value::Data(VALUE_FIELD_NAME.into()), + ]), + redis::Value::Bulk(vec![ + redis::Value::Data(key.into()), + redis::Value::Data(val.into()), + ]), + ])); + + Ok(()) +} + +#[test] +fn delete() -> Result<(), RedisError> { + let key = "4892225613598153"; + let val = "4892225613598153_val"; + let mut conn = get_conn("kv2vector"); + let rsp: Value = redis::cmd("vadd") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg(val) + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(1)); + + let rsp: Value = redis::cmd("vdel") + .arg(key) + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(1)); + + Ok(()) +} + +//构建一个sql长度为MAX_PAYLOAD_LEN的packet +#[test] +fn set_huge_payload() -> Result<(), RedisError> { + let key = "4892225613598123"; + //当val长度为MAX_PAYLOAD_LEN - 1 - 76,构建出来的insert语句长度恰好为MAX_PAYLOAD_LEN + let val = vec!['a' as u8; MAX_PAYLOAD_LEN]; + let mut conn = get_conn("kv2vector"); + let rsp: Value = redis::cmd("vadd") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg(val) + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(1)); + Ok(()) +} + +#[test] +fn update_not_exsit() -> Result<(), RedisError> { + let key = "4892225613598145"; + let mut conn = get_conn("kv2vector"); + let _: Value = redis::cmd("vdel") + .arg(key) + .query(&mut conn)?; + let rsp: Value = redis::cmd("vupdate") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("4892225613598145_val") + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(0)); + Ok(()) +} + +#[test] +fn delete_not_exsit()-> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let key = "4892225613598146"; + let _: Value = redis::cmd("vdel") + .arg(key) + .query(&mut conn)?; + + let rsp: Value = redis::cmd("vdel") + .arg(key) + .query(&mut conn)?; + assert_eq!(rsp, Value::Int(0)); + Ok(()) +} + +#[test] +fn get_invalid_key() -> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let result: Result = redis::cmd("vget") + .arg("9527") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} + +#[test] +fn insert_invalid_key() -> Result<(), RedisError> { + let key = "9527"; + let mut conn = get_conn("kv2vector"); + let result: Result = redis::cmd("vadd") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("abcd") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} + + +#[test] +fn insert_char_key() -> Result<(), RedisError> { + let key = "48922256135981cc"; + let mut conn = get_conn("kv2vector"); + let result: Result = redis::cmd("vadd") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("abcd") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} +#[test] +fn insert_long_key()-> Result<(), RedisError> { + let key = std::iter::repeat('5').take(1024).collect::(); + let mut conn = get_conn("kv2vector"); + let result: Result = redis::cmd("vadd") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("abcd") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} + +#[test] +fn update_invalid_key() -> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let key = "9527"; + let result: Result = redis::cmd("vupdate") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("abcd") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} + +#[test] +fn update_char_key() -> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let key = "48922256135981cc"; + let result: Result = redis::cmd("vupdate") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("abcd") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} +#[test] +fn update_long_key() -> Result<(), RedisError> { + let key = std::iter::repeat('5').take(1024).collect::(); + let mut conn = get_conn("kv2vector"); + let result: Result = redis::cmd("vupdate") + .arg(key) + .arg(VALUE_FIELD_NAME) + .arg("abcd") + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} + +#[test] +fn delete_invalid_key() -> Result<(), RedisError> { + let key = "9527"; + let mut conn = get_conn("kv2vector"); + let result: Result = redis::cmd("vdel") + .arg(key) + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} +#[test] +fn delete_char_key() -> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let key = "48922256135981cc"; + let result: Result = redis::cmd("vdel") + .arg(key) + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} +#[test] +fn delete_long_key() -> Result<(), RedisError> { + let mut conn = get_conn("kv2vector"); + let key = std::iter::repeat('5').take(1024).collect::(); + let result: Result = redis::cmd("vdel") + .arg(key) + .query(&mut conn); + assert!(result.is_err_and(|e| e.to_string().contains(ERR_INVALID_REQ))); + Ok(()) +} +#[test] +#[should_panic] +#[ignore] +fn sql_inject_get() { + let mut conn = get_conn("kv2vector"); + let key_inject = "4892225613598265 or 1=1 --"; + let _result:Result = redis::cmd("vget") + .arg(key_inject) + .query(&mut conn); +} diff --git a/tests_integration/src/vector/mod.rs b/tests_integration/src/vector/mod.rs index 047713c4c..6cd465954 100644 --- a/tests_integration/src/vector/mod.rs +++ b/tests_integration/src/vector/mod.rs @@ -7,6 +7,7 @@ use redis::Value; mod aggregation; mod byme; mod assist; +mod kvtime; pub(crate) const RESTYPE: &str = "vector"; pub(crate) const CMD_VGET: &str = "vget";