Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ env:
phantom: localhost:9303
mysql: localhost:3306
vector: localhost:3308
kv2vector: localhost:3309
mq: localhost:56815
min_key: 1
max_key: 10000
Expand All @@ -31,7 +32,7 @@ jobs:
docker run --rm --name breeze_ci_mysql4623 -p 4623:3306 -d parabala/mysqlci_with_schema:v0.0.2
docker run --rm --name breeze_ci_mysql4624 -p 4624:3306 -d parabala/mysqlci_with_schema:v0.0.2
- name: Prepare Vintage_MC_Redis
run: docker run -d -v /home/runner/work/breeze:/data1/resource/breeze --net="host" --name breeze_github_ci hustfisher/breeze:githubci115
run: docker run -d -v /home/runner/work/breeze:/data1/resource/breeze --net="host" --name breeze_github_ci hustfisher/breeze:githubci116
- uses: actions/checkout@v3
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
Expand All @@ -53,6 +54,7 @@ jobs:
curl http://127.0.0.1:8080/config/cloud/kv/testbreeze/kvmeshtest
sleep 1
curl http://127.0.0.1:8080/config/cloud/vector/testbreeze/vectortest
curl http://127.0.0.1:8080/config/cloud/vector/testbreeze/kv2vector
- name: Create Socks
run: |
#ps -aux|grep breeze
Expand All @@ -66,6 +68,7 @@ jobs:
touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+phantom+testbreeze+phantomtest@phantom:9303@pt
touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+kv+testbreeze+kvmeshtest@kv:3306@kv
touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+vectortest@vector:3308@vector
touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+kv2vector@vector:3309@vector
touch /home/runner/work/breeze/socks/127.0.0.1:8080+config+cloud+mq+testbreeze+mcqmeshtest_1@msgque:56815@mq
ls -all /home/runner/work/breeze/snapshot
ls -all /home/runner/work/breeze/socks
Expand Down Expand Up @@ -94,7 +97,7 @@ jobs:
run: cargo test -p tests --features github_workflow -p endpoint
- name: wait until 2 mins
run: |
port_list=(56810 56812 9302 9301 9303 3306 3308) # 端口列表
port_list=(56810 56812 9302 9301 9303 3306 3308 3309 56815) # 端口列表
start=$(date +%s) # 获取当前时间戳

while true; do
Expand Down
6 changes: 4 additions & 2 deletions ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ container_name=breeze_github_ci
docker ps -a | grep "$container_name" && docker rm -f "$container_name"


docker run --rm -d -v $brz_home:/data1/resource/breeze --net="host" --name "$container_name" parabala/breeze:githubci108
docker run --rm -d -v $brz_home:/data1/resource/breeze --net="host" --name "$container_name" hustfisher/breeze:githubci116

# rm -rf $brz_home/*
mkdir -p $brz_home/logs
Expand All @@ -24,6 +24,7 @@ touch $brz_home/socks/127.0.0.1:8080+config+cloud+counterservice+testbreeze+mesh
touch $brz_home/socks/127.0.0.1:8080+config+cloud+phantom+testbreeze+phantomtest@phantom:9303@pt
touch $brz_home/socks/127.0.0.1:8080+config+cloud+kv+testbreeze+kvmeshtest@kv:3306@kv
touch $brz_home/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+vectortest@vector:3308@vector
touch $brz_home/socks/127.0.0.1:8080+config+cloud+vector+testbreeze+kv2vector@vector:3309@vector


cargo build
Expand All @@ -38,14 +39,15 @@ export mc=localhost:9301
export phantom=localhost:9303
export mysql=localhost:3306
export vector=localhost:3308
export kv2vector=localhost:3309
export min_key=1
export max_key=10000
export socks_dir=$brz_home/socks

RUST_BACKTRACE=1 cargo test -p tests

#等待mesh初始化,最多等待两分钟
port_list=(56810 56812 9302 9301 9303 3306 3308) # 端口列表
port_list=(56810 56812 9302 9301 9303 3306 3308 3309) # 端口列表
start=$(date +%s) # 获取当前时间戳

while true; do
Expand Down
9 changes: 8 additions & 1 deletion endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{Endpoint, Endpoints, Topology};
use discovery::TopologyWrite;
use protocol::memcache::Binary;
use protocol::{Protocol, Request, Resource::Memcache};
use sharding::hash::{Hash, HashKey, Hasher};
use sharding::hash::{Hash, HashKey, Hasher, HashGrouper};

use super::config::Flag;
use crate::shards::Shards;
Expand Down Expand Up @@ -74,6 +74,13 @@ where
}
}

impl<E, P> HashGrouper for CacheService<E, P>
where
E: Endpoint,
P: Protocol,
{
}

impl<E, Req, P> Topology for CacheService<E, P>
where
E: Endpoint<Item = Req>,
Expand Down
9 changes: 9 additions & 0 deletions endpoint/src/kv/kvtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ impl KVTime {
}
}
}
#[inline]
pub fn table_postfix(&self) -> Postfix {
return self.table_postfix.clone()
}
#[inline]
pub fn get_date(&self, key: &RingSlice) -> NaiveDate {
let uuid = key.uuid();
uuid.native_date()
}
}
impl Strategy for KVTime {
fn distribution(&self) -> &DBRange {
Expand Down
9 changes: 8 additions & 1 deletion endpoint/src/kv/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use protocol::Request;
use protocol::ResOption;
use protocol::Resource;
use rand::seq::SliceRandom;
use sharding::hash::{Hash, HashKey};
use sharding::hash::{Hash, HashKey, HashGrouper};

use crate::dns::DnsConfig;
use crate::Timeout;
Expand Down Expand Up @@ -57,6 +57,13 @@ where
}
}

impl<E, P> HashGrouper for KvService<E, P>
where
E: Endpoint,
P: Protocol,
{
}

impl<E, Req, P> Topology for KvService<E, P>
where
E: Endpoint<Item = Req>,
Expand Down
12 changes: 11 additions & 1 deletion endpoint/src/kv/uuid.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{Datelike, TimeZone};
use chrono::{Datelike, TimeZone, NaiveDate};
use chrono_tz::Asia::Shanghai;
// use std::collections::HashMap;
// use std::sync::atomic::{AtomicI64, Ordering};
Expand Down Expand Up @@ -29,6 +29,8 @@ pub trait Uuid {
fn ymd(&self) -> (u16, u8, u8);
// 返回year
fn year(&self) -> u16;
// 返回date
fn native_date(&self) -> NaiveDate;
}

impl Uuid for i64 {
Expand All @@ -53,6 +55,14 @@ impl Uuid for i64 {
let (year, _, _) = self.ymd();
year
}
#[inline(always)]
fn native_date(&self) -> NaiveDate {
chrono::Utc
.timestamp_opt(self.unix_secs(), 0)
.unwrap()
.with_timezone(&Shanghai)
.date_naive()
}
}

pub trait UuidGet {
Expand Down
9 changes: 8 additions & 1 deletion endpoint/src/msgque/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{
use crate::dns::{DnsConfig, DnsLookup};
use crate::msgque::SizedQueueInfo;
use crate::{CloneAbleAtomicBool, Endpoint, Endpoints, Timeout, Topology};
use sharding::hash::{Hash, HashKey, Hasher, Padding};
use sharding::hash::{Hash, HashKey, Hasher, Padding, HashGrouper};

// ip vintage下线后,2分钟后真正从读列表中下线,写列表是立即下线的
const OFFLINE_LIMIT_SECONDS: u64 = 60 * 2;
Expand Down Expand Up @@ -93,6 +93,13 @@ where
}
}

impl<E, P> HashGrouper for MsgQue<E, P>
where
E: Endpoint,
P: Protocol,
{
}

impl<E, Req, P> Topology for MsgQue<E, P>
where
E: Endpoint<Item = Req>,
Expand Down
9 changes: 8 additions & 1 deletion endpoint/src/phantomservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use discovery::{Inited, TopologyWrite};
use protocol::{Protocol, Request, Resource::Phantom};
use sharding::{
distribution::Range,
hash::{Crc32, Hash, HashKey},
hash::{Crc32, Hash, HashKey, HashGrouper},
};

use super::config::PhantomNamespace;
Expand Down Expand Up @@ -45,6 +45,13 @@ where
}
}

impl<E, P> HashGrouper for PhantomService<E, P>
where
E: Endpoint,
P: Protocol,
{
}

impl<E, Req, P> Topology for PhantomService<E, P>
where
E: Endpoint<Item = Req>,
Expand Down
9 changes: 8 additions & 1 deletion endpoint/src/redisservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use discovery::TopologyWrite;
use protocol::{Protocol, RedisFlager, Request, Resource::Redis};
use sharding::distribution::Distribute;
use sharding::hash::{Hash, HashKey, Hasher};
use sharding::hash::{Hash, HashKey, Hasher, HashGrouper};

use super::config::RedisNamespace;

Expand Down Expand Up @@ -43,6 +43,13 @@ where
}
}

impl<E, P> HashGrouper for RedisService<E, P>
where
E: Endpoint,
P: Protocol,
{
}

impl<E, Req, P> Topology for RedisService<E, P>
where
E: Endpoint<Item = Req>,
Expand Down
9 changes: 7 additions & 2 deletions endpoint/src/topo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use discovery::{Inited, TopologyWrite};
use protocol::{Protocol, Request, ResOption, Resource};
use sharding::hash::{Hash, HashKey};
use sharding::hash::{Hash, HashKey, HashGrouper};
use ds::RingSlice;

use crate::Timeout;

Expand Down Expand Up @@ -34,7 +35,7 @@ procs::topology_dispatcher! {
fn build<P:Protocol>(addr: &str, p: P, r: Resource, service: &str, to: Timeout) -> Self {Self::build_o(addr, p, r, service, to, Default::default())}
} => where P:Protocol, E:Endpoint<Item = R> + Inited, R: Request

pub trait Topology : Endpoint + Hash{
pub trait Topology : Endpoint + Hash + HashGrouper {
fn exp_sec(&self) -> u32 {86400}
fn has_attach(&self) -> bool {false}
} => where P:Protocol, E:Endpoint<Item = R>, R:Request, Topologies<E, P>: Endpoint
Expand All @@ -50,6 +51,10 @@ procs::topology_dispatcher! {
fn load(&mut self) -> bool;
} => where P:Protocol, E:Endpoint

trait HashGrouper {
fn group(&self, _key: &RingSlice) -> Option<Vec<(Vec<u8>, i64)>> { None }
} => where P:Protocol, E:Endpoint

trait Hash {
fn hash<S: HashKey>(&self, key: &S) -> i64;
} => where P:Protocol, E:Endpoint,
Expand Down
9 changes: 8 additions & 1 deletion endpoint/src/uuid/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use discovery::TopologyWrite;
use protocol::{Protocol, Request, Resource::Uuid};
use sharding::hash::{Hash, HashKey};
use sharding::hash::{Hash, HashKey, HashGrouper};

use super::config::UuidNamespace;

Expand Down Expand Up @@ -37,6 +37,13 @@ where
}
}

impl<E, P> HashGrouper for UuidService<E, P>
where
E: Endpoint,
P: Protocol,
{
}

impl<E, Req, P> Topology for UuidService<E, P>
where
E: Endpoint<Item = Req>,
Expand Down
4 changes: 4 additions & 0 deletions endpoint/src/vector/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ impl Aggregation {
CommandType::Unknown => panic!("not sup {:?}", vcmd.cmd),
}
}
#[inline]
pub(crate) fn table_postfix(&self) -> Postfix {
self.table_postfix.clone()
}
}

#[derive(Clone, Debug)]
Expand Down
25 changes: 24 additions & 1 deletion endpoint/src/vector/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ impl Strategist {
ns.si_backends.len() as u32,
))
}
"kvtime" => {
Self::VectorTime(VectorTime::new_with_db(
ns.basic.db_name.clone(),
ns.basic.table_name.clone(),
ns.basic.db_count,
//此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认
ns.backends.iter().next().unwrap().1.len() as u32,
ns.basic.table_postfix.as_str().try_into().ok()?,
ns.basic.keys.clone(),
))
}
_ => {
if ns.basic.keys.len() < 2 {
log::warn!("len keys < 2");
Expand Down Expand Up @@ -119,12 +130,24 @@ impl Strategist {
}
}

pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<()> {
pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<()> {
match self {
Strategist::VectorTime(inner) => inner.check_vector_cmd(vcmd),
Strategist::Aggregation(inner) => inner.check_vector_cmd(vcmd),
}
}
pub(crate) fn keys(&self) -> &[String] {
match self {
Strategist::VectorTime(inner) => inner.keys(),
Strategist::Aggregation(inner) => inner.keys(),
}
}
pub(crate) fn table_postfix(&self) -> Postfix {
match self {
Strategist::VectorTime(inner) => inner.table_postfix(),
Strategist::Aggregation(inner) => inner.table_postfix(),
}
}

// /// 获得配置的默认route;当配置strategy为aggregation时,默认的route是Aggregation,否则就是Main
// #[inline]
Expand Down
Loading