Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
654b193
统一ring_slice的最基础的操作
icycrystal4 Mar 7, 2024
6c1c5a7
log: 消除部分编译警告
icycrystal4 Mar 7, 2024
da1628c
ephemera vec: 优化标准的vec内存分配器
icycrystal4 Mar 8, 2024
dcd18da
ephemera vec: 优化标准的vec内存分配器
icycrystal4 Mar 8, 2024
e5a12bd
Writer: 统一writer
icycrystal4 Mar 8, 2024
bb01660
dns: 约有1.8k/s的对象分配是由dns resolver提供,先将lookup单独抽离到独立的module。后续优化
icycrystal4 Mar 11, 2024
c262558
dns: 约有1.8k/s的对象分配是由dns resolver提供,先将lookup单独抽离到独立的module。后续优化
icycrystal4 Mar 11, 2024
9d141a1
dns: 约有1.8k/s的对象分配是由dns resolver提供,先将lookup单独抽离到独立的module。后续优化
icycrystal4 Mar 11, 2024
2bce10d
dns: 约有1.8k/s的对象分配是由dns resolver提供,先将lookup单独抽离到独立的module。后续优化
icycrystal4 Mar 11, 2024
aa41764
dns: 约有1.8k/s的对象分配是由dns resolver提供,先将lookup单独抽离到独立的module。后续优化
icycrystal4 Mar 11, 2024
005cf61
dns: 使用vec替换hashmap,提升cpu利用率
icycrystal4 Mar 12, 2024
b3379dd
metrics: f64基于0内存分配实现
icycrystal4 Mar 12, 2024
edcc37d
Merge branch 'dev' into dev_icy
icycrystal4 Mar 15, 2024
9571ca4
mc: 长度检测时直接断言,而不是返回错误
icycrystal4 Mar 15, 2024
d63c015
pipeline: 简化Self的mut capture. rust老版本不支持Self的fields被同时mut引用
icycrystal4 Mar 15, 2024
faad62d
protocol mc: write response之前,检查req与response的seq是否一致,不一致直接panic
icycrystal4 Mar 16, 2024
3324b75
prometheus: register时,全局共享body
icycrystal4 Mar 20, 2024
b301597
prometheus: 单次请求共享ext buf
icycrystal4 Mar 20, 2024
2570165
dns lookup: 每次扫描完所有的host后,周期从0开始。
icycrystal4 Mar 20, 2024
29ae2fe
top discovery: 简化处理逻辑。
icycrystal4 Mar 22, 2024
9994d15
top discovery: 简化处理逻辑。
icycrystal4 Mar 22, 2024
eba149e
删除部分不必要的文件,把top update的移动到update模块中
icycrystal4 Mar 22, 2024
e0a5201
Merge branch 'main' into dev_mem
parabala Mar 28, 2024
89480e7
移除不使用的变量disable_clean_service_path
parabala Mar 28, 2024
10c4fd9
Merge pull request #441 from weibocom/dev_mem
parabala Mar 28, 2024
cd26ad5
对kv中无响应的req构建默认响应body
hustfisher Mar 28, 2024
7d288ad
Merge branch 'main' into main_update_kv_response
hustfisher Apr 11, 2024
c58e8da
Merge branch 'main' into main_update_kv_response
hustfisher Apr 22, 2024
6b5879d
fix conflicts
hustfisher Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
brz_home="/data1/ci/breeze"
mkdir -p $brz_home

docker ps -a | grep breeze_ci_mysql4623 && docker rm -f breeze_ci_mysql4623
docker ps -a | grep breeze_ci_mysql4624 && docker rm -f breeze_ci_mysql4624
docker stop breeze_ci_mysql4623 breeze_ci_mysql4624 breeze_github_ci 2>/dev/null

docker run --rm --name breeze_ci_mysql4623 -p 4623:3306 -d parabala/mysqlci_with_schema:v0.0.2
docker run --rm --name breeze_ci_mysql4624 -p 4624:3306 -d parabala/mysqlci_with_schema:v0.0.2

container_name=breeze_github_ci
docker ps -a | grep "$container_name" && docker rm -f "$container_name"


docker run --rm -d -v $brz_home:/data1/resource/breeze --net="host" --name "$container_name" parabala/breeze:githubci108

# rm -rf $brz_home/*
Expand Down Expand Up @@ -75,3 +71,4 @@ RUST_BACKTRACE=1 cargo test -p tests_integration --features github_workflow

kill -9 $pid

docker stop breeze_ci_mysql4623 breeze_ci_mysql4624 breeze_github_ci
27 changes: 27 additions & 0 deletions ds/src/io/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{RingSlice, Slicer};
pub trait Buffer: Sized {
fn write<S: Slicer>(&mut self, data: &S);
#[inline(always)]
fn write_u16(&mut self, num: u16) {
self.write(&num.to_be_bytes());
}
#[inline(always)]
fn write_u32(&mut self, num: u32) {
self.write(&num.to_be_bytes());
}
#[inline(always)]
fn write_u64(&mut self, num: u64) {
self.write(&num.to_be_bytes());
}
#[inline(always)]
fn write_slice(&mut self, slice: &RingSlice) {
self.write(slice)
}
}
use crate::Writer;
impl<T: Writer> Buffer for T {
#[inline(always)]
fn write<S: Slicer>(&mut self, data: &S) {
self.write_r(0, data).expect("no err")
}
}
35 changes: 35 additions & 0 deletions ds/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
mod order;
pub use order::*;

mod buffer;
pub use buffer::*;

use crate::{Range, Slicer};
// 把一个Slicer的部分切片写入到一个Writer中。必须全部写入成功,否则返回错误。
pub trait Writer {
fn write_all(&mut self, data: &[u8]) -> std::io::Result<()>;
#[inline(always)]
fn write_r<S: Slicer, R: Range>(&mut self, r: R, slicer: &S) -> std::io::Result<()> {
slicer.with_seg(r, |seg, _oft, _seg| self.write_all(seg))
}
}

impl Writer for Vec<u8> {
#[inline]
fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
self.reserve(data.len());
use std::ptr::copy_nonoverlapping as copy;
unsafe {
let len = self.len();
let ptr = self.as_mut_ptr().add(len);
copy(data.as_ptr(), ptr, data.len());
self.set_len(len + data.len());
}
Ok(())
}
#[inline(always)]
fn write_r<S: Slicer, R: Range>(&mut self, r: R, slicer: &S) -> std::io::Result<()> {
self.reserve(r.len(slicer));
slicer.with_seg(r, |seg, _oft, _seg| self.write_all(seg))
}
}
29 changes: 29 additions & 0 deletions ds/src/io/order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::RingSlice;
use procs::impl_number_ringslice;
// 如果方法名中没有包含be或者le,则默认为be
#[impl_number_ringslice(default = "be")]
pub trait ByteOrder {
fn i8(&self, oft: usize) -> i8;
fn u8(&self, oft: usize) -> u8;
fn u16_le(&self, oft: usize) -> u16;
fn i16_le(&self, oft: usize) -> i16;
fn u24_le(&self, oft: usize) -> u32;
fn i24_le(&self, oft: usize) -> i32;
fn u32_le(&self, oft: usize) -> u32;
fn i32_le(&self, oft: usize) -> i32;
fn u40_le(&self, oft: usize) -> u64;
fn i40_le(&self, oft: usize) -> i64;
fn u48_le(&self, oft: usize) -> u64;
fn i48_le(&self, oft: usize) -> i64;
fn u56_le(&self, oft: usize) -> u64;
fn i56_le(&self, oft: usize) -> i64;
fn u64_le(&self, oft: usize) -> u64;
fn i64_le(&self, oft: usize) -> i64;

fn u16_be(&self, oft: usize) -> u16;
fn i24_be(&self, oft: usize) -> i32;
fn u32_be(&self, oft: usize) -> u32;
fn u64_be(&self, oft: usize) -> u64;
fn f32_le(&self, oft: usize) -> f32;
fn f64_le(&self, oft: usize) -> f64;
}
50 changes: 10 additions & 40 deletions ds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,68 +6,38 @@ mod mem;
//pub mod queue;
pub mod rand;
pub mod utf8;
pub mod vec;
mod waker;

pub mod vec;
pub use vec::*;

pub use cow::*;
pub use mem::*;
pub use vec::Buffer;
mod switcher;
//pub use queue::PinnedQueue;
pub use switcher::Switcher;
pub use utf8::*;
pub use waker::AtomicWaker;

pub mod time;

mod io;
pub use io::*;

mod asserts;

mod bits;
pub use bits::*;

use std::ptr::copy_nonoverlapping as copy;

pub trait BufWriter {
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()>;
#[inline]
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> {
self.write_all(buf0)?;
self.write_all(buf1)
}
}

impl BufWriter for Vec<u8> {
#[inline]
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
self.reserve(buf.len());
let len = self.len();
unsafe { self.set_len(len + buf.len()) };
(&mut self[len..]).write_all(buf)?;
Ok(())
fn write_all_hint(&mut self, buf: &[u8], _next: bool) -> std::io::Result<()> {
self.write_all(buf)
}
#[inline]
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> {
let reserve_len = buf0.len() + buf1.len();
self.reserve(reserve_len);
let len = self.len();
unsafe { self.set_len(len + reserve_len) };
(&mut self[len..]).write_seg_all(buf0, buf1)?;
Ok(())
}
}
impl BufWriter for [u8] {
#[inline(always)]
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
debug_assert!(self.len() >= buf.len(), "{} >= {}", self.len(), buf.len());
unsafe { copy(buf.as_ptr(), self.as_mut_ptr(), buf.len()) };
Ok(())
}
#[inline(always)]
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> {
debug_assert!(self.len() >= buf0.len() + buf1.len());
unsafe { copy(buf0.as_ptr(), self.as_mut_ptr(), buf0.len()) };
unsafe { copy(buf1.as_ptr(), self.as_mut_ptr().add(buf0.len()), buf1.len()) };
Ok(())
self.write_all(buf0)?;
self.write_all(buf1)
}
}

Expand Down
3 changes: 3 additions & 0 deletions ds/src/mem/arena/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ pub use ephemera::*;
mod cache;
pub use cache::*;

mod vec;
pub(crate) use vec::*;

use std::ptr::NonNull;
pub trait Allocator<T> {
fn alloc(&self, t: T) -> NonNull<T>;
Expand Down
132 changes: 132 additions & 0 deletions ds/src/mem/arena/vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::BrzMalloc;

use std::alloc::{GlobalAlloc, Layout};
use std::sync::atomic::{fence, AtomicU8, AtomicUsize, Ordering::*};
unsafe impl Sync for CacheArena {}
#[repr(align(64))]
pub(crate) struct CacheArena<const CAP: usize = 8388608> {
ptr: *mut u8,
// 指向当前访问的chunk
idx: AtomicU8,
// 1. 低32位目前已经分配的内存大小(指向的位置)
// 2. 高32位用来控制释放。是释放的数量,释放的数量到0时,说明当前的arena可以重新分配内存;
chunks: [Chunk; 2],
}
const ALIGN: usize = 64;
impl<const CAP: usize> CacheArena<CAP> {
pub(crate) fn new() -> Self {
let cap = CAP;
// 按页对齐,分配内存
debug_assert!(cap.is_power_of_two() && cap & 4095 == 0);
let layout = Layout::array::<u8>(cap).unwrap();
let ptr = unsafe { BrzMalloc.alloc(layout) };
let each = cap / 2;
Self {
ptr,
idx: AtomicU8::new(0),
chunks: [Chunk::new(ptr, each), Chunk::new(ptr, each)],
}
}
#[inline(always)]
const fn cap(&self) -> usize {
CAP
}
#[inline]
fn layout(&self, size: usize) -> Layout {
unsafe { Layout::from_size_align_unchecked(size, ALIGN) }
}
#[inline(always)]
const fn max_alloc(&self) -> usize {
self.cap() / 32
}
#[inline]
pub(crate) fn alloc(&self, size: usize) -> (*mut u8, usize) {
let size = (size + ALIGN - 1) & !(ALIGN - 1);
if size <= self.max_alloc() {
let idx = self.idx.load(Relaxed);
let chunk = &self.chunks[idx as usize];
if let Some(p) = chunk.alloc(size).or_else(|| {
let idx = 1 - idx;
self.idx.store(idx, Relaxed);
self.get(idx as usize).alloc(size)
}) {
return p;
}
}
// 从堆上分配
unsafe { (BrzMalloc.alloc(self.layout(size)), size) }
}
#[inline(always)]
fn get(&self, idx: usize) -> &Chunk {
debug_assert!(idx <= 1);
unsafe { &self.chunks.get_unchecked(idx) }
}
#[inline]
pub(crate) fn dealloc(&self, ptr: *mut u8, size: usize) {
debug_assert!(size & (ALIGN - 1) == 0);
if ptr >= self.ptr && ptr < unsafe { self.ptr.add(self.cap()) } {
let idx = ((ptr as usize - self.ptr as usize) >= self.cap()) as usize;
debug_assert!(idx <= 1);
self.get(idx).dealloc(ptr);
} else {
unsafe { BrzMalloc.dealloc(ptr, self.layout(size)) };
}
}
}

struct Chunk {
ptr: *mut u8,
alloc_oft: AtomicUsize,
cap: usize,
}

impl Chunk {
fn new(ptr: *mut u8, cap: usize) -> Self {
Self {
ptr,
cap,
alloc_oft: AtomicUsize::new(0),
}
}
// 按64字节对齐
#[inline(always)]
fn alloc(&self, size: usize) -> Option<(*mut u8, usize)> {
debug_assert!(size & (ALIGN - 1) == 0);
let oft = self.incr_size(size);
if oft + size <= self.cap {
Some((unsafe { self.ptr.add(oft) }, size))
} else {
self.free_one();
None
}
}
#[inline(always)]
fn incr_size(&self, size: usize) -> usize {
debug_assert!(size + self.cap < u32::MAX as usize);
let v = size + (1 << 32);
let alloc_oft = self.alloc_oft.fetch_add(v, Relaxed);
// 低32位为分配的偏移量,高32位为分配次数
alloc_oft as u32 as usize
}
// 释放的时候统一释放
#[inline(always)]
fn free_one(&self) {
let alloc_oft = self.alloc_oft.fetch_sub(1 << 32, Release);
if (alloc_oft >> 32) != 1 {
return;
}
fence(Acquire);
self.alloc_oft.store(0, Release);
}
#[inline(always)]
fn dealloc(&self, ptr: *mut u8) {
debug_assert!(ptr >= self.ptr && ptr < unsafe { self.ptr.add(self.cap) });
self.free_one();
}
}
// Drop
impl<const CAP: usize> Drop for CacheArena<CAP> {
fn drop(&mut self) {
unsafe { BrzMalloc.dealloc(self.ptr, Layout::array::<u8>(self.cap()).unwrap()) };
}
}
17 changes: 0 additions & 17 deletions ds/src/mem/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,6 @@ impl RingBuffer {
self.advance_write(read);
out
}
//// 返回可写入的buffer。如果无法写入,则返回一个长度为0的slice
//#[inline]
//fn as_mut_bytes(&mut self) -> &mut [u8] {
// if self.read + self.size == self.write {
// // 已满
// unsafe { from_raw_parts_mut(self.data.as_ptr(), 0) }
// } else {
// let offset = self.mask(self.write);
// let read = self.mask(self.read);
// let n = if offset < read {
// read - offset
// } else {
// self.size - offset
// };
// unsafe { from_raw_parts_mut(self.data.as_ptr().offset(offset as isize), n) }
// }
//}
#[inline]
pub fn data(&self) -> RingSlice {
RingSlice::from(self.data.as_ptr(), self.size, self.read, self.write)
Expand Down
Loading