diff --git a/ci.sh b/ci.sh index ab214f8b4..191266b10 100755 --- a/ci.sh +++ b/ci.sh @@ -2,15 +2,11 @@ brz_home="/data1/ci/breeze" mkdir -p $brz_home -docker ps -a | grep breeze_ci_mysql4623 && docker rm -f breeze_ci_mysql4623 -docker ps -a | grep breeze_ci_mysql4624 && docker rm -f breeze_ci_mysql4624 +docker stop breeze_ci_mysql4623 breeze_ci_mysql4624 breeze_github_ci 2>/dev/null + docker run --rm --name breeze_ci_mysql4623 -p 4623:3306 -d parabala/mysqlci_with_schema:v0.0.2 docker run --rm --name breeze_ci_mysql4624 -p 4624:3306 -d parabala/mysqlci_with_schema:v0.0.2 - container_name=breeze_github_ci -docker ps -a | grep "$container_name" && docker rm -f "$container_name" - - docker run --rm -d -v $brz_home:/data1/resource/breeze --net="host" --name "$container_name" parabala/breeze:githubci108 # rm -rf $brz_home/* @@ -75,3 +71,4 @@ RUST_BACKTRACE=1 cargo test -p tests_integration --features github_workflow kill -9 $pid +docker stop breeze_ci_mysql4623 breeze_ci_mysql4624 breeze_github_ci diff --git a/ds/src/io/buffer.rs b/ds/src/io/buffer.rs new file mode 100644 index 000000000..f58d474ec --- /dev/null +++ b/ds/src/io/buffer.rs @@ -0,0 +1,27 @@ +use crate::{RingSlice, Slicer}; +pub trait Buffer: Sized { + fn write(&mut self, data: &S); + #[inline(always)] + fn write_u16(&mut self, num: u16) { + self.write(&num.to_be_bytes()); + } + #[inline(always)] + fn write_u32(&mut self, num: u32) { + self.write(&num.to_be_bytes()); + } + #[inline(always)] + fn write_u64(&mut self, num: u64) { + self.write(&num.to_be_bytes()); + } + #[inline(always)] + fn write_slice(&mut self, slice: &RingSlice) { + self.write(slice) + } +} +use crate::Writer; +impl Buffer for T { + #[inline(always)] + fn write(&mut self, data: &S) { + self.write_r(0, data).expect("no err") + } +} diff --git a/ds/src/io/mod.rs b/ds/src/io/mod.rs new file mode 100644 index 000000000..c60e039eb --- /dev/null +++ b/ds/src/io/mod.rs @@ -0,0 +1,35 @@ +mod order; +pub use order::*; + +mod buffer; +pub use buffer::*; + +use crate::{Range, Slicer}; +// 把一个Slicer的部分切片写入到一个Writer中。必须全部写入成功,否则返回错误。 +pub trait Writer { + fn write_all(&mut self, data: &[u8]) -> std::io::Result<()>; + #[inline(always)] + fn write_r(&mut self, r: R, slicer: &S) -> std::io::Result<()> { + slicer.with_seg(r, |seg, _oft, _seg| self.write_all(seg)) + } +} + +impl Writer for Vec { + #[inline] + fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> { + self.reserve(data.len()); + use std::ptr::copy_nonoverlapping as copy; + unsafe { + let len = self.len(); + let ptr = self.as_mut_ptr().add(len); + copy(data.as_ptr(), ptr, data.len()); + self.set_len(len + data.len()); + } + Ok(()) + } + #[inline(always)] + fn write_r(&mut self, r: R, slicer: &S) -> std::io::Result<()> { + self.reserve(r.len(slicer)); + slicer.with_seg(r, |seg, _oft, _seg| self.write_all(seg)) + } +} diff --git a/ds/src/io/order.rs b/ds/src/io/order.rs new file mode 100644 index 000000000..66fc50f44 --- /dev/null +++ b/ds/src/io/order.rs @@ -0,0 +1,29 @@ +use crate::RingSlice; +use procs::impl_number_ringslice; +// 如果方法名中没有包含be或者le,则默认为be +#[impl_number_ringslice(default = "be")] +pub trait ByteOrder { + fn i8(&self, oft: usize) -> i8; + fn u8(&self, oft: usize) -> u8; + fn u16_le(&self, oft: usize) -> u16; + fn i16_le(&self, oft: usize) -> i16; + fn u24_le(&self, oft: usize) -> u32; + fn i24_le(&self, oft: usize) -> i32; + fn u32_le(&self, oft: usize) -> u32; + fn i32_le(&self, oft: usize) -> i32; + fn u40_le(&self, oft: usize) -> u64; + fn i40_le(&self, oft: usize) -> i64; + fn u48_le(&self, oft: usize) -> u64; + fn i48_le(&self, oft: usize) -> i64; + fn u56_le(&self, oft: usize) -> u64; + fn i56_le(&self, oft: usize) -> i64; + fn u64_le(&self, oft: usize) -> u64; + fn i64_le(&self, oft: usize) -> i64; + + fn u16_be(&self, oft: usize) -> u16; + fn i24_be(&self, oft: usize) -> i32; + fn u32_be(&self, oft: usize) -> u32; + fn u64_be(&self, oft: usize) -> u64; + fn f32_le(&self, oft: usize) -> f32; + fn f64_le(&self, oft: usize) -> f64; +} diff --git a/ds/src/lib.rs b/ds/src/lib.rs index 06af2878d..2177a1671 100644 --- a/ds/src/lib.rs +++ b/ds/src/lib.rs @@ -6,68 +6,38 @@ mod mem; //pub mod queue; pub mod rand; pub mod utf8; -pub mod vec; mod waker; +pub mod vec; +pub use vec::*; + pub use cow::*; pub use mem::*; -pub use vec::Buffer; mod switcher; -//pub use queue::PinnedQueue; pub use switcher::Switcher; pub use utf8::*; pub use waker::AtomicWaker; pub mod time; +mod io; +pub use io::*; + mod asserts; mod bits; pub use bits::*; -use std::ptr::copy_nonoverlapping as copy; - pub trait BufWriter { fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()>; #[inline] - fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> { - self.write_all(buf0)?; - self.write_all(buf1) - } -} - -impl BufWriter for Vec { - #[inline] - fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { - self.reserve(buf.len()); - let len = self.len(); - unsafe { self.set_len(len + buf.len()) }; - (&mut self[len..]).write_all(buf)?; - Ok(()) + fn write_all_hint(&mut self, buf: &[u8], _next: bool) -> std::io::Result<()> { + self.write_all(buf) } #[inline] fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> { - let reserve_len = buf0.len() + buf1.len(); - self.reserve(reserve_len); - let len = self.len(); - unsafe { self.set_len(len + reserve_len) }; - (&mut self[len..]).write_seg_all(buf0, buf1)?; - Ok(()) - } -} -impl BufWriter for [u8] { - #[inline(always)] - fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { - debug_assert!(self.len() >= buf.len(), "{} >= {}", self.len(), buf.len()); - unsafe { copy(buf.as_ptr(), self.as_mut_ptr(), buf.len()) }; - Ok(()) - } - #[inline(always)] - fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> { - debug_assert!(self.len() >= buf0.len() + buf1.len()); - unsafe { copy(buf0.as_ptr(), self.as_mut_ptr(), buf0.len()) }; - unsafe { copy(buf1.as_ptr(), self.as_mut_ptr().add(buf0.len()), buf1.len()) }; - Ok(()) + self.write_all(buf0)?; + self.write_all(buf1) } } diff --git a/ds/src/mem/arena/mod.rs b/ds/src/mem/arena/mod.rs index 7ad95aa7e..ba4814312 100644 --- a/ds/src/mem/arena/mod.rs +++ b/ds/src/mem/arena/mod.rs @@ -4,6 +4,9 @@ pub use ephemera::*; mod cache; pub use cache::*; +mod vec; +pub(crate) use vec::*; + use std::ptr::NonNull; pub trait Allocator { fn alloc(&self, t: T) -> NonNull; diff --git a/ds/src/mem/arena/vec.rs b/ds/src/mem/arena/vec.rs new file mode 100644 index 000000000..0ee5cbb63 --- /dev/null +++ b/ds/src/mem/arena/vec.rs @@ -0,0 +1,132 @@ +use crate::BrzMalloc; + +use std::alloc::{GlobalAlloc, Layout}; +use std::sync::atomic::{fence, AtomicU8, AtomicUsize, Ordering::*}; +unsafe impl Sync for CacheArena {} +#[repr(align(64))] +pub(crate) struct CacheArena { + ptr: *mut u8, + // 指向当前访问的chunk + idx: AtomicU8, + // 1. 低32位目前已经分配的内存大小(指向的位置) + // 2. 高32位用来控制释放。是释放的数量,释放的数量到0时,说明当前的arena可以重新分配内存; + chunks: [Chunk; 2], +} +const ALIGN: usize = 64; +impl CacheArena { + pub(crate) fn new() -> Self { + let cap = CAP; + // 按页对齐,分配内存 + debug_assert!(cap.is_power_of_two() && cap & 4095 == 0); + let layout = Layout::array::(cap).unwrap(); + let ptr = unsafe { BrzMalloc.alloc(layout) }; + let each = cap / 2; + Self { + ptr, + idx: AtomicU8::new(0), + chunks: [Chunk::new(ptr, each), Chunk::new(ptr, each)], + } + } + #[inline(always)] + const fn cap(&self) -> usize { + CAP + } + #[inline] + fn layout(&self, size: usize) -> Layout { + unsafe { Layout::from_size_align_unchecked(size, ALIGN) } + } + #[inline(always)] + const fn max_alloc(&self) -> usize { + self.cap() / 32 + } + #[inline] + pub(crate) fn alloc(&self, size: usize) -> (*mut u8, usize) { + let size = (size + ALIGN - 1) & !(ALIGN - 1); + if size <= self.max_alloc() { + let idx = self.idx.load(Relaxed); + let chunk = &self.chunks[idx as usize]; + if let Some(p) = chunk.alloc(size).or_else(|| { + let idx = 1 - idx; + self.idx.store(idx, Relaxed); + self.get(idx as usize).alloc(size) + }) { + return p; + } + } + // 从堆上分配 + unsafe { (BrzMalloc.alloc(self.layout(size)), size) } + } + #[inline(always)] + fn get(&self, idx: usize) -> &Chunk { + debug_assert!(idx <= 1); + unsafe { &self.chunks.get_unchecked(idx) } + } + #[inline] + pub(crate) fn dealloc(&self, ptr: *mut u8, size: usize) { + debug_assert!(size & (ALIGN - 1) == 0); + if ptr >= self.ptr && ptr < unsafe { self.ptr.add(self.cap()) } { + let idx = ((ptr as usize - self.ptr as usize) >= self.cap()) as usize; + debug_assert!(idx <= 1); + self.get(idx).dealloc(ptr); + } else { + unsafe { BrzMalloc.dealloc(ptr, self.layout(size)) }; + } + } +} + +struct Chunk { + ptr: *mut u8, + alloc_oft: AtomicUsize, + cap: usize, +} + +impl Chunk { + fn new(ptr: *mut u8, cap: usize) -> Self { + Self { + ptr, + cap, + alloc_oft: AtomicUsize::new(0), + } + } + // 按64字节对齐 + #[inline(always)] + fn alloc(&self, size: usize) -> Option<(*mut u8, usize)> { + debug_assert!(size & (ALIGN - 1) == 0); + let oft = self.incr_size(size); + if oft + size <= self.cap { + Some((unsafe { self.ptr.add(oft) }, size)) + } else { + self.free_one(); + None + } + } + #[inline(always)] + fn incr_size(&self, size: usize) -> usize { + debug_assert!(size + self.cap < u32::MAX as usize); + let v = size + (1 << 32); + let alloc_oft = self.alloc_oft.fetch_add(v, Relaxed); + // 低32位为分配的偏移量,高32位为分配次数 + alloc_oft as u32 as usize + } + // 释放的时候统一释放 + #[inline(always)] + fn free_one(&self) { + let alloc_oft = self.alloc_oft.fetch_sub(1 << 32, Release); + if (alloc_oft >> 32) != 1 { + return; + } + fence(Acquire); + self.alloc_oft.store(0, Release); + } + #[inline(always)] + fn dealloc(&self, ptr: *mut u8) { + debug_assert!(ptr >= self.ptr && ptr < unsafe { self.ptr.add(self.cap) }); + self.free_one(); + } +} +// Drop +impl Drop for CacheArena { + fn drop(&mut self) { + unsafe { BrzMalloc.dealloc(self.ptr, Layout::array::(self.cap()).unwrap()) }; + } +} diff --git a/ds/src/mem/buffer.rs b/ds/src/mem/buffer.rs index ef685baa9..eaeb83ac5 100644 --- a/ds/src/mem/buffer.rs +++ b/ds/src/mem/buffer.rs @@ -73,23 +73,6 @@ impl RingBuffer { self.advance_write(read); out } - //// 返回可写入的buffer。如果无法写入,则返回一个长度为0的slice - //#[inline] - //fn as_mut_bytes(&mut self) -> &mut [u8] { - // if self.read + self.size == self.write { - // // 已满 - // unsafe { from_raw_parts_mut(self.data.as_ptr(), 0) } - // } else { - // let offset = self.mask(self.write); - // let read = self.mask(self.read); - // let n = if offset < read { - // read - offset - // } else { - // self.size - offset - // }; - // unsafe { from_raw_parts_mut(self.data.as_ptr().offset(offset as isize), n) } - // } - //} #[inline] pub fn data(&self) -> RingSlice { RingSlice::from(self.data.as_ptr(), self.size, self.read, self.write) diff --git a/ds/src/mem/bytes.rs b/ds/src/mem/bytes.rs deleted file mode 100644 index 1d5634ad3..000000000 --- a/ds/src/mem/bytes.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::RingSlice; -use procs::impl_number_ringslice; -// 如果方法名中没有包含be或者le,则默认为be -#[impl_number_ringslice(default = "be")] -pub trait ByteOrder { - fn i8(&self, oft: usize) -> i8; - fn u8(&self, oft: usize) -> u8; - fn u16_le(&self, oft: usize) -> u16; - fn i16_le(&self, oft: usize) -> i16; - fn u24_le(&self, oft: usize) -> u32; - fn i24_le(&self, oft: usize) -> i32; - fn u32_le(&self, oft: usize) -> u32; - fn i32_le(&self, oft: usize) -> i32; - fn u40_le(&self, oft: usize) -> u64; - fn i40_le(&self, oft: usize) -> i64; - fn u48_le(&self, oft: usize) -> u64; - fn i48_le(&self, oft: usize) -> i64; - fn u56_le(&self, oft: usize) -> u64; - fn i56_le(&self, oft: usize) -> i64; - fn u64_le(&self, oft: usize) -> u64; - fn i64_le(&self, oft: usize) -> i64; - - fn u16_be(&self, oft: usize) -> u16; - fn i24_be(&self, oft: usize) -> i32; - fn u32_be(&self, oft: usize) -> u32; - fn u64_be(&self, oft: usize) -> u64; - fn f32_le(&self, oft: usize) -> f32; - fn f64_le(&self, oft: usize) -> f64; -} - -pub trait Range { - fn range(&self, slice: &RingSlice) -> (usize, usize); - #[inline] - fn start(&self, slice: &RingSlice) -> usize { - self.range(slice).0 - } -} - -pub trait Visit { - fn check(&mut self, b: u8, idx: usize) -> bool; -} -impl Visit for u8 { - #[inline(always)] - fn check(&mut self, b: u8, _idx: usize) -> bool { - *self == b - } -} -impl bool> Visit for T { - #[inline(always)] - fn check(&mut self, b: u8, idx: usize) -> bool { - self(b, idx) - } -} - -type Offset = usize; -impl Range for Offset { - #[inline(always)] - fn range(&self, slice: &RingSlice) -> (usize, usize) { - debug_assert!(*self <= slice.len()); - (*self, slice.len()) - } -} - -impl Range for std::ops::Range { - #[inline(always)] - fn range(&self, slice: &RingSlice) -> (usize, usize) { - debug_assert!(self.start <= slice.len()); - debug_assert!(self.end <= slice.len()); - (self.start, self.end) - } -} -impl Range for std::ops::RangeFrom { - #[inline(always)] - fn range(&self, slice: &RingSlice) -> (usize, usize) { - debug_assert!(self.start <= slice.len()); - (self.start, slice.len()) - } -} -impl Range for std::ops::RangeTo { - #[inline(always)] - fn range(&self, slice: &RingSlice) -> (usize, usize) { - debug_assert!(self.end <= slice.len()); - (0, self.end) - } -} -impl Range for std::ops::RangeFull { - #[inline(always)] - fn range(&self, slice: &RingSlice) -> (usize, usize) { - (0, slice.len()) - } -} diff --git a/ds/src/mem/guarded.rs b/ds/src/mem/guarded.rs index 832a77eed..90049106b 100644 --- a/ds/src/mem/guarded.rs +++ b/ds/src/mem/guarded.rs @@ -95,6 +95,17 @@ impl DerefMut for MemGuard { &mut self.mem } } +use crate::{Merge, Range, Slicer}; +impl Slicer for MemGuard { + #[inline] + fn len(&self) -> usize { + self.mem.len() + } + #[inline] + fn with_seg(&self, r: R, v: impl FnMut(&[u8], usize, bool) -> O) -> O { + self.mem.with_seg(r, v) + } +} impl MemGuard { #[inline] diff --git a/ds/src/mem/mod.rs b/ds/src/mem/mod.rs index 911e60203..e6416c783 100644 --- a/ds/src/mem/mod.rs +++ b/ds/src/mem/mod.rs @@ -4,6 +4,9 @@ pub use buffer::*; mod resized; pub use resized::*; +mod slice; +pub use slice::*; + mod ring_slice; pub use ring_slice::*; @@ -18,9 +21,6 @@ pub use malloc::*; pub mod arena; -mod bytes; -pub use self::bytes::*; - use std::sync::atomic::{AtomicI64, Ordering::Relaxed}; pub static BUF_TX: Buffers = Buffers::new(); pub static BUF_RX: Buffers = Buffers::new(); diff --git a/ds/src/mem/ring_slice.rs b/ds/src/mem/ring_slice.rs index cba2a673d..2c2c5b28c 100644 --- a/ds/src/mem/ring_slice.rs +++ b/ds/src/mem/ring_slice.rs @@ -1,5 +1,6 @@ use std::{ fmt::{Debug, Display, Formatter}, + ptr::copy_nonoverlapping, slice::from_raw_parts, }; @@ -15,22 +16,6 @@ pub struct RingSlice { mask: u32, } -macro_rules! with_segment { - ($self:ident, $range:expr, $noseg:expr, $seg:expr) => {{ - let (oft, end) = $range.range($self); - debug_assert!(oft <= end && end <= $self.len()); - let len = end - oft; - let oft_start = $self.mask($self.start() + oft); - if oft_start + len <= $self.cap() { - unsafe { $noseg($self.ptr().add(oft_start), len) } - } else { - let seg1 = $self.cap() - oft_start; - let seg2 = len - seg1; - unsafe { $seg($self.ptr().add(oft_start), seg1, $self.ptr(), seg2) } - } - }}; -} - impl RingSlice { #[inline] pub fn empty() -> Self { @@ -75,24 +60,22 @@ impl RingSlice { } #[inline] pub fn str_num(&self, r: impl Range) -> usize { - let (start, end) = r.range(self); - let mut num = 0usize; - for i in start..end { - num = num.wrapping_mul(10) + (self[i] - b'0') as usize; - } - num + self.fold(r, 0, |num, b| { + *num = num.wrapping_mul(10) + (b - b'0') as usize + }) } + // 如果有一个非数字的字符,则返回None #[inline] pub fn try_str_num(&self, r: impl Range) -> Option { - let (start, end) = r.range(self); - let mut num = 0usize; - for i in start..end { - if !self[i].is_ascii_digit() { - return None; + let mut digit = true; + let num = self.fold_r(r, 0usize, |num, b| { + digit &= b.is_ascii_digit(); + if digit { + *num = num.wrapping_mul(10) + (b - b'0') as usize; } - num = num.wrapping_mul(10) + (self[i] - b'0') as usize; - } - Some(num) + !digit + }); + digit.then_some(num) } #[inline] @@ -106,39 +89,65 @@ impl RingSlice { mask: self.mask, } } + // 这个方法是最核心的方法,所有的方法都是基于这个方法实现的 + // F:ptr, len, oft, segment + #[inline] + fn _visit(&self, r: R, mut f: F) -> O + where + O: Merge, + R: Range, + F: FnMut(*const u8, usize, usize, bool) -> O, + { + let (oft, end) = r.range(self); + debug_assert!(oft <= end && end <= self.len()); + let len = end - oft; + let oft_start = self.mask as usize & (self.start() + oft); + let seg1 = self.cap() - oft_start; + if len <= seg1 { + unsafe { f(self.ptr().add(oft_start), len, oft, false) } + } else { + let o0 = unsafe { f(self.ptr().add(oft_start), seg1, oft, true) }; + let seg2 = len - seg1; + o0.merge(|| f(self.ptr(), seg2, self.cap() - self.start(), true)) + } + } + #[inline(always)] + fn _visit_data(&self, r: R, mut f: F) -> O + where + O: Merge, + R: Range, + F: FnMut(&[u8], usize, bool) -> O, + { + self._visit(r, |p, len, oft, seg| unsafe { + f(from_raw_parts(p, len), oft, seg) + }) + } #[inline(always)] pub fn visit(&self, mut f: impl FnMut(u8)) { - self.visit_seg(0, |p, l| { - for i in 0..l { + self._visit(0usize, |p, len, _oft, _seg| { + for i in 0..len { unsafe { f(*p.add(i)) }; } - }); + }) } #[inline(always)] pub fn visit_seg(&self, r: R, mut f: impl FnMut(*const u8, usize)) { - with_segment!(self, r, |p, l| f(p, l), |p0, l0, p1, l1| { - f(p0, l0); - f(p1, l1) - }) + self._visit(r, |p, len, _, _| f(p, len)) } #[inline(always)] pub fn visit_data(&self, r: impl Range, mut f: impl FnMut(&[u8])) { - with_segment!(self, r, |p, l| f(from_raw_parts(p, l)), |p0, l0, p1, l1| { - { - f(from_raw_parts(p0, l0)); - f(from_raw_parts(p1, l1)); - } - }) + self._visit(r, |p, len, _, _| f(unsafe { from_raw_parts(p, len) })) } #[inline(always)] pub fn data_r(&self, r: impl Range) -> (&[u8], &[u8]) { static EMPTY: &[u8] = &[]; - with_segment!( - self, - r, - |ptr, len| (from_raw_parts(ptr, len), EMPTY), - |p0, l0, p1, l1| (from_raw_parts(p0, l0), from_raw_parts(p1, l1)) - ) + let mut seg = [EMPTY, EMPTY]; + let mut idx = 0; + self._visit(r, |p, l, _idx, _seg| { + seg[idx] = unsafe { from_raw_parts(p, l) }; + idx += 1; + }); + (&seg[0], &seg[1]) } #[inline(always)] pub fn data(&self) -> (&[u8], &[u8]) { @@ -149,46 +158,45 @@ impl RingSlice { pub unsafe fn data_dump(&self) -> &[u8] { from_raw_parts(self.ptr(), self.cap()) } + // 遍历,直到v返回true时. #[inline(always)] - pub fn fold_r bool>( - &self, - r: R, - mut init: I, - mut v: V, - ) -> I { - macro_rules! visit { - ($p:expr, $l:expr) => {{ - for i in 0..$l { - if !v(&mut init, *$p.add(i)) { - return false; - } + pub fn fold_r bool>(&self, r: R, init: I, mut v: V) -> I { + let mut init = init; + self._visit(r, |p, l, _oft, _seg| { + for i in 0..l { + if !v(&mut init, unsafe { *p.add(i) }) { + continue; } - true - }}; - } - with_segment!( - self, - r, - |p: *mut u8, l| visit!(p, l), - |p0: *mut u8, l0, p1: *mut u8, l1| { visit!(p0, l0) && visit!(p1, l1) } - ); + // 说明v返回true,则终止遍历 + return true; + } + // 尝试下一个seg + false + }); init } + // 遍历所有的数据。 #[inline(always)] pub fn fold(&self, r: R, init: I, mut v: impl FnMut(&mut I, u8)) -> I { self.fold_r(r, init, |i, b| { v(i, b); - true + false }) } #[inline] pub fn copy_to(&self, r: R, w: &mut W) -> std::io::Result<()> { - with_segment!( - self, - r, - |p, l| w.write_all(from_raw_parts(p, l)), - |p0, l0, p1, l1| { w.write_seg_all(from_raw_parts(p0, l0), from_raw_parts(p1, l1)) } - ) + self._visit(r, |p, len, _, seg| { + w.write_all_hint(unsafe { from_raw_parts(p, len) }, seg) + }) + } + #[inline] + fn _copy_to>(&self, r: R, mut o: T) { + let out = o.as_mut(); + let mut idx = 0; + self._visit(r, |p, len, _, _| { + unsafe { copy_nonoverlapping(p, out.as_mut_ptr().add(idx), len) }; + idx += len; + }); } #[inline] pub fn copy_to_w(&self, r: R, w: &mut W) { @@ -197,36 +205,42 @@ impl RingSlice { } #[inline] pub fn copy_to_vec(&self, v: &mut Vec) { - self.copy_to_w(.., v); + self.copy_to_v(.., v); } #[inline] pub fn copy_to_vec_with_len(&self, v: &mut Vec, len: usize) { - self.copy_to_w(..len, v); + self.copy_to_v(..len, v); } #[inline] - pub fn copy_to_vec_r(&self, v: &mut Vec, r: R) { - self.copy_to_w(r, v); + pub fn copy_to_v(&self, r: R, v: &mut Vec) { + let rlen = r.r_len(self); + v.reserve(rlen); + let len = v.len(); + unsafe { v.set_len(len + rlen) }; + let out = &mut v[len..]; + self._copy_to(r, out); } /// copy 数据到切片/数组中,目前暂时不需要oft,有需求后再加 #[inline] pub fn copy_to_slice(&self, s: &mut [u8]) { - self.copy_to_w(.., s); + self._copy_to(0, s) } #[inline] - pub fn copy_to_r(&self, s: &mut [u8], r: R) { - self.copy_to_w(r, s); + pub fn copy_to_r(&self, r: R, s: &mut [u8]) { + self._copy_to(r, s); } #[inline(always)] - pub(super) fn cap(&self) -> usize { + pub(crate) fn cap(&self) -> usize { self.cap as usize } #[inline(always)] - pub(super) fn start(&self) -> usize { + pub(crate) fn start(&self) -> usize { + debug_assert!(self.start < self.cap || self.start == 0, "{self}"); self.start as usize } #[inline(always)] - pub(super) fn mask(&self, oft: usize) -> usize { + pub(crate) fn mask(&self, oft: usize) -> usize { (self.mask & oft as u32) as usize } @@ -235,6 +249,12 @@ impl RingSlice { self.len as usize } #[inline(always)] + pub fn first(&self) -> u8 { + debug_assert!(self.len > 0); + debug_assert!(self.start < self.cap); + unsafe { *self.ptr() } + } + #[inline(always)] pub fn at(&self, idx: usize) -> u8 { self[idx] } @@ -243,7 +263,7 @@ impl RingSlice { self[idx] = b; } #[inline(always)] - pub(super) fn ptr(&self) -> *mut u8 { + pub(crate) fn ptr(&self) -> *mut u8 { self.ptr as *mut u8 } @@ -251,15 +271,18 @@ impl RingSlice { pub fn find(&self, offset: usize, b: u8) -> Option { self.find_r(offset, b) } + // 找到满足f.check(b, idx)的第一个字节位置,返回idx #[inline] pub fn find_r(&self, r: impl Range, mut f: impl Visit) -> Option { - let (start, end) = r.range(self); - for i in start..end { - if f.check(self[i], i) { - return Some(i); + self._visit(r, |p, len, oft, _seg| { + for i in 0..len { + let idx = oft + i; + if f.check(unsafe { *p.add(i) }, idx) { + return Some(idx); + } } - } - None + None + }) } // 跳过num个'\r\n',返回下一个字节地址 #[inline] @@ -269,73 +292,47 @@ impl RingSlice { } Some(oft) } - // 查找是否存在 '\r\n' ,返回匹配的第一个字节地址 + // 查找是否存在 '\r\n' ,返回'\r' 的位置 #[inline] pub fn find_lf_cr(&self, offset: usize) -> Option { self.find_r(offset..self.len() - 1, |b, idx| { + debug_assert!(idx < self.len() - 1, "{offset} => {idx}, {self} "); b == b'\r' && self[idx + 1] == b'\n' }) } #[inline] pub fn equal(&self, other: &[u8]) -> bool { - if self.len() != other.len() { - return false; - } - for i in 0..self.len() { - if self[i] != other[i] { - return false; - } - } - return true; + self.eq(other) } /// 判断是否相同,忽略大小写 #[inline] pub fn equal_ignore_case(&self, other: &[u8]) -> bool { - if self.len() != other.len() { - return false; - } - for i in 0..self.len() { - // 指令一般是大写 - if self[i].to_ascii_uppercase() != other[i].to_ascii_uppercase() { - return false; - } - } - return true; + self.len() == other.len() && self.compare(0, other, |a, b| a.eq_ignore_ascii_case(b)) + } + #[inline] + fn compare(&self, r: impl Range, s: &[u8], eq: impl Fn(&[u8], &[u8]) -> bool) -> bool { + debug_assert!(r.r_len(self) == s.len()); + let mut cmp = s; + let mut matched = true; + self._visit_data(r, |data, _, _| { + let l = data.len().min(cmp.len()); + let d = &data[..l]; + matched &= eq(d, &cmp[..l]); + cmp = &cmp[l..]; + // 还有未比对完成的数据,且当前数据不是最后一个数据,则继续比对 + cmp.len() == 0 || !matched + }); + matched } #[inline] pub fn start_with(&self, oft: usize, s: &[u8]) -> bool { - if oft + s.len() <= self.len() { - with_segment!( - self, - oft..oft + s.len(), - |p, _l| { from_raw_parts(p, s.len()) == s }, - |p0, l0, p1, _l1| from_raw_parts(p0, l0) == &s[..l0] - && from_raw_parts(p1, s.len() - l0) == &s[l0..] - ) - } else { - false - } + oft + s.len() <= self.len() && self.compare(oft..oft + s.len(), s, |a, b| a == b) } #[inline] pub fn start_ignore_case(&self, oft: usize, s: &[u8]) -> bool { - if oft + s.len() <= self.len() { - with_segment!( - self, - oft, - |p, _l| { from_raw_parts(p, s.len()).eq_ignore_ascii_case(s) }, - |p0, l0, p1, _l1| { - if l0 < s.len() { - from_raw_parts(p0, l0).eq_ignore_ascii_case(&s[..l0]) - && from_raw_parts(p1, s.len() - l0).eq_ignore_ascii_case(&s[l0..]) - } else { - from_raw_parts(p0, s.len()).eq_ignore_ascii_case(s) - } - } - ) - } else { - false - } + oft + s.len() <= self.len() + && self.compare(oft..oft + s.len(), s, |a, b| a.eq_ignore_ascii_case(b)) } // 读取一个u16的数字,大端 @@ -348,11 +345,8 @@ impl RingSlice { /// 展示所有内容,仅用于长度比较小的场景 fishermen #[inline] pub fn as_string_lossy(&self) -> String { - if self.len() >= 512 { - log::warn!("as_string_lossy: data too long: {:?}", self); - } let mut vec = Vec::with_capacity(self.len()); - self.copy_to_w(0, &mut vec); + self.copy_to_v(0, &mut vec); String::from_utf8(vec).unwrap_or_default() } } @@ -408,7 +402,7 @@ impl std::ops::IndexMut for RingSlice { impl PartialEq<[u8]> for super::RingSlice { #[inline] fn eq(&self, other: &[u8]) -> bool { - self.len() == other.len() && self.start_with(0, other) + self.len() == other.len() && self.compare(0, other, |a, b| a == b) } } // 内容相等 @@ -419,3 +413,15 @@ impl PartialEq for super::RingSlice { self.len() == other.len() && self.start_with(0, f) && self.start_with(f.len(), s) } } + +use crate::{Merge, Slicer}; +impl Slicer for RingSlice { + #[inline(always)] + fn len(&self) -> usize { + self.len as usize + } + #[inline] + fn with_seg(&self, r: R, v: impl FnMut(&[u8], usize, bool) -> O) -> O { + self._visit_data(r, v) + } +} diff --git a/ds/src/mem/slice/mod.rs b/ds/src/mem/slice/mod.rs new file mode 100644 index 000000000..9310d0312 --- /dev/null +++ b/ds/src/mem/slice/mod.rs @@ -0,0 +1,125 @@ +impl> Slicer for T { + #[inline(always)] + fn len(&self) -> usize { + self.as_ref().len() + } + #[inline(always)] + fn with_seg(&self, r: R, mut v: impl FnMut(&[u8], usize, bool) -> O) -> O { + let (start, end) = r.range(self); + debug_assert!(start <= end && end <= self.len() as usize); + v(self.as_ref(), start, false) + } +} +impl Slicer for str { + #[inline(always)] + fn len(&self) -> usize { + self.len() + } + #[inline(always)] + fn with_seg(&self, r: R, v: impl FnMut(&[u8], usize, bool) -> O) -> O { + self.as_bytes().with_seg(r, v) + } +} + +impl Merge for Option { + #[inline(always)] + fn merge(self, other: impl FnMut() -> Self) -> Self { + self.or_else(other) + } +} +impl Merge for bool { + #[inline(always)] + fn merge(self, mut other: impl FnMut() -> Self) -> Self { + self || other() + } +} +impl Merge for () { + #[inline(always)] + fn merge(self, mut other: impl FnMut() -> Self) -> Self { + other() + } +} +impl Merge for Result<(), E> { + #[inline(always)] + fn merge(self, mut other: impl FnMut() -> Self) -> Self { + self.and_then(|_| other()) + } +} +pub trait Slicer { + fn len(&self) -> usize; + fn with_seg(&self, r: R, v: impl FnMut(&[u8], usize, bool) -> O) -> O; +} +pub trait Merge { + fn merge(self, other: impl FnMut() -> Self) -> Self; +} + +pub trait Range { + #[inline(always)] + fn len(&self, s: &S) -> usize { + self.r_len(s) + } + #[inline(always)] + fn r_len(&self, s: &S) -> usize { + let r = self.range(s); + r.1 - r.0 + } + fn range(&self, s: &S) -> (usize, usize); + #[inline] + fn start(&self, s: &S) -> usize { + self.range(s).0 + } +} + +pub trait Visit { + fn check(&mut self, b: u8, idx: usize) -> bool; +} +impl Visit for u8 { + #[inline(always)] + fn check(&mut self, b: u8, _idx: usize) -> bool { + *self == b + } +} +impl bool> Visit for T { + #[inline(always)] + fn check(&mut self, b: u8, idx: usize) -> bool { + self(b, idx) + } +} + +type Offset = usize; +impl Range for Offset { + #[inline(always)] + fn range(&self, s: &S) -> (usize, usize) { + debug_assert!(*self <= s.len()); + (*self, s.len()) + } +} + +impl Range for std::ops::Range { + #[inline(always)] + fn range(&self, s: &S) -> (usize, usize) { + debug_assert!(self.start <= s.len()); + debug_assert!(self.end <= s.len()); + (self.start, self.end) + } +} +impl Range for std::ops::RangeFrom { + #[inline(always)] + fn range(&self, s: &S) -> (usize, usize) { + debug_assert!(self.start <= s.len()); + (self.start, s.len()) + } +} +impl Range for std::ops::RangeTo { + #[inline(always)] + fn range(&self, s: &S) -> (usize, usize) { + debug_assert!(self.end <= s.len()); + (0, self.end) + } +} +impl Range for std::ops::RangeFull { + #[inline(always)] + fn range(&self, s: &S) -> (usize, usize) { + (0, s.len()) + } +} diff --git a/ds/src/vec.rs b/ds/src/vec.rs deleted file mode 100644 index e8a279442..000000000 --- a/ds/src/vec.rs +++ /dev/null @@ -1,62 +0,0 @@ -macro_rules! define_read_number { - ($($fn_name:ident, $type_name:tt);+) => { - pub trait Buffer { - fn write>(&mut self, data: D); - $( - fn $fn_name(&mut self, num:$type_name); - )+ - fn write_slice(&mut self, slice:&crate::RingSlice); - } - - impl Buffer for Vec { - #[inline] - fn write>(&mut self, data: D) { - let b = data.as_ref(); - use std::ptr::copy_nonoverlapping as copy; - self.reserve(b.len()); - unsafe { - copy( - b.as_ptr() as *const u8, - self.as_mut_ptr().offset(self.len() as isize), - b.len(), - ); - self.set_len(self.len() + b.len()); - } - } - #[inline] - fn write_slice(&mut self, data:&crate::RingSlice) { - data.copy_to_vec(self); - } - $( - #[inline] - fn $fn_name(&mut self, num: $type_name) { - self.write(num.to_be_bytes()); - } - )+ - } - }; -} - -// big endian -define_read_number!( - // 备注:write_u8 可以直接用push代替 - write_u16, u16; - write_u32, u32; - write_u64, u64 -); - -pub trait Add { - fn add(&mut self, t: T); -} - -impl Add for Vec -where - T: std::cmp::PartialEq, -{ - #[inline] - fn add(&mut self, e: T) { - if !self.contains(&e) { - self.push(e); - } - } -} diff --git a/ds/src/vec/ephemera.rs b/ds/src/vec/ephemera.rs new file mode 100644 index 000000000..9e5f1f2a1 --- /dev/null +++ b/ds/src/vec/ephemera.rs @@ -0,0 +1,95 @@ +use std::ptr::copy_nonoverlapping; + +use crate::{arena::CacheArena, Writer}; + +// 用来分配快速释放的Vec +// 1. cap大小固定, 不支持动态扩展; +// 2. 内存会优先从cache中分配,如果cache中没有足够的内存,会从堆中分配; +pub struct EphemeralVec { + ptr: *mut u8, + len: u32, + cap: u32, +} +impl From> for EphemeralVec { + #[inline] + fn from(vec: Vec) -> Self { + assert!(vec.capacity() < u32::MAX as usize); + let mut v = Self::fix_cap(vec.len()); + v.write_all(vec.as_slice()).expect("err"); + v + } +} + +impl EphemeralVec { + pub fn fix_cap(cap: usize) -> Self { + let (ptr, cap) = VEC_CACHE_ARENA.alloc(cap); + assert!(cap < u32::MAX as usize); + Self { + ptr, + len: 0, + cap: cap as u32, + } + } + #[inline(always)] + pub fn into_raw_parts(self) -> (*mut u8, usize, usize) { + let me = std::mem::ManuallyDrop::new(self); + (me.ptr, me.len as usize, me.cap as usize) + } + #[inline(always)] + pub fn from_raw_parts(ptr: *mut u8, len: usize, cap: usize) -> Self { + assert!(len <= cap && cap < u32::MAX as usize); + Self { + ptr, + len: len as u32, + cap: cap as u32, + } + } + #[inline(always)] + pub fn push(&mut self, val: u8) { + assert!(self.len < self.cap); + unsafe { *self.ptr.add(self.len as usize) = val }; + self.len += 1; + } + #[inline(always)] + pub fn len(&self) -> usize { + self.len as usize + } + #[inline(always)] + pub fn cap(&self) -> usize { + self.cap as usize + } +} +// 实现Deref<[u8]> +use std::ops::{Deref, DerefMut}; +impl Deref for EphemeralVec { + type Target = [u8]; + #[inline(always)] + fn deref(&self) -> &Self::Target { + unsafe { std::slice::from_raw_parts(self.ptr, self.len as usize) } + } +} +impl DerefMut for EphemeralVec { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len as usize) } + } +} +impl crate::Writer for EphemeralVec { + #[inline(always)] + fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> { + assert!(self.len() + data.len() <= self.cap()); + unsafe { copy_nonoverlapping(data.as_ptr(), self.ptr.add(self.len()), data.len()) }; + self.len += data.len() as u32; + Ok(()) + } +} +// 实现Drop +// 当EphemeralVec被drop时,会自动释放内存 +impl Drop for EphemeralVec { + #[inline(always)] + fn drop(&mut self) { + VEC_CACHE_ARENA.dealloc(self.ptr, self.cap as usize); + } +} +#[ctor::ctor] +static VEC_CACHE_ARENA: CacheArena = CacheArena::new(); diff --git a/ds/src/vec/mod.rs b/ds/src/vec/mod.rs new file mode 100644 index 000000000..5c35ddcaf --- /dev/null +++ b/ds/src/vec/mod.rs @@ -0,0 +1,18 @@ +mod ephemera; +pub use ephemera::*; + +pub trait Add { + fn add(&mut self, t: T); +} + +impl Add for Vec +where + T: std::cmp::PartialEq, +{ + #[inline] + fn add(&mut self, e: T) { + if !self.contains(&e) { + self.push(e); + } + } +} diff --git a/protocol/src/kv/mod.rs b/protocol/src/kv/mod.rs index 9bcfb4b97..e0570be6d 100644 --- a/protocol/src/kv/mod.rs +++ b/protocol/src/kv/mod.rs @@ -53,6 +53,10 @@ pub mod prelude { // 如果没有找到value,且无异常,则返回该响应内容 lazy_static! { static ref NOT_FOUND: Vec = b"not found".to_vec(); + static ref TOP_INVALID_RSP: RingSlice = + RingSlice::from_slice(b"invalid request: year out of index"); + static ref REQ_INVALID_RSP: RingSlice = RingSlice::from_slice(b"invalid request"); + static ref UNKNOWN_RSP: RingSlice = RingSlice::from_slice(b"server may unavailable"); } #[derive(Clone, Default)] @@ -172,18 +176,6 @@ impl Protocol for Kv { let old_op_code = ctx.request().op_code() as u8; - // 如果原始请求是quite_get请求,并且not found,则不回写。 - // if let Some(rsp) = response { - // log::debug!( - // "+++ sent to client for req:{:?}, rsp:{:?}", - // ctx.request(), - // rsp - // ); - // // mysql 请求到正确的数据,才会转换并write - // self.write_mc_packet(ctx.request(), Some(rsp), w)?; - // return Ok(()); - // } - // 先进行metrics统计 //self.metrics(ctx.request(), None, ctx); log::debug!("+++ send to client rsp, req:{:?}", ctx.request(),); @@ -436,19 +428,23 @@ impl Kv { OP_GET | OP_GETQ => (None, Some(MARKER_BYTE_ARR)), _ => (None, None), }; - let err_response; - let response = match ctx.ctx().error { + + let response: Option<&RingSlice> = match ctx.ctx().error { ContextStatus::TopInvalid => { assert!(response.is_none()); - err_response = Some(RingSlice::from_slice(b"invalid request: year out of index")); - err_response.as_ref() + Some(&TOP_INVALID_RSP) } ContextStatus::ReqInvalid => { assert!(response.is_none()); - err_response = Some(RingSlice::from_slice(b"invalid request")); - err_response.as_ref() + Some(&REQ_INVALID_RSP) + } + ContextStatus::Ok => { + if status == RespStatus::NoError || response.is_some() { + response.map(|r| r.deref().deref()) + } else { + Some(&UNKNOWN_RSP) + } } - ContextStatus::Ok => response.map(|r| r.deref().deref()), }; if status != RespStatus::NoError && status != RespStatus::NotFound { log::error!( diff --git a/protocol/src/redis/command.rs b/protocol/src/redis/command.rs index b07603b17..b3ea56e3c 100644 --- a/protocol/src/redis/command.rs +++ b/protocol/src/redis/command.rs @@ -164,13 +164,13 @@ impl CommandProperties { cmd.push(b'*'); // 1个cmd, 1个key,1个value。一共3个bulk cmd.push((2 + self.has_val as u8) + b'0'); - cmd.write("\r\n"); + cmd.write(b"\r\n"); cmd.push(b'$'); cmd.write(&self.mname_len); - cmd.write("\r\n"); - cmd.write(self.mname); - cmd.write("\r\n"); - cmd.write_slice(data); + cmd.write(b"\r\n"); + cmd.write(&self.mname); + cmd.write(b"\r\n"); + cmd.write(data); //data.copy_to_vec(&mut cmd); use super::flag::RedisFlager; if first { diff --git a/protocol/src/stream.rs b/protocol/src/stream.rs index 3a842527e..25722f039 100644 --- a/protocol/src/stream.rs +++ b/protocol/src/stream.rs @@ -22,34 +22,29 @@ pub trait BufRead { fn reserve(&mut self, r: usize); } use super::Result; -pub trait Writer: ds::BufWriter + Sized { +pub trait Writer: ds::Writer + Sized { fn cap(&self) -> usize; fn pending(&self) -> usize; // 写数据,一次写完 - fn write(&mut self, data: &[u8]) -> Result<()>; + #[inline(always)] + fn write(&mut self, data: &[u8]) -> Result<()> { + self.write_r(0, &data)?; + Ok(()) + } #[inline] fn write_u8(&mut self, v: u8) -> Result<()> { self.write(&[v]) } #[inline] fn write_u16(&mut self, v: u16) -> Result<()> { - // let mut data = Vec::with_capacity(2); - // data.write_u16::(v)?; - // self.write(&data[0..]) self.write(&v.to_be_bytes()) } #[inline] fn write_u32(&mut self, v: u32) -> Result<()> { - // let mut data = Vec::with_capacity(4); - // data.write_u32::(v)?; - // self.write(&data[0..]) self.write(&v.to_be_bytes()) } #[inline] fn write_u64(&mut self, v: u64) -> Result<()> { - // let mut data = Vec::with_capacity(8); - // data.write_u64::(v)?; - // self.write(&data[0..]) self.write(&v.to_be_bytes()) } #[inline(always)] @@ -63,14 +58,14 @@ pub trait Writer: ds::BufWriter + Sized { #[inline] fn write_slice>(&mut self, data: &S, oft: usize) -> Result<()> { - (&*data).copy_to(oft, self)?; + self.write_r(oft, &**data)?; Ok(()) } // 暂时没发现更好的实现方式,先用这个实现 #[inline] fn write_ringslice(&mut self, data: &RingSlice, oft: usize) -> Result<()> { - data.copy_to(oft, self)?; + self.write_r(oft, data)?; Ok(()) } diff --git a/rt/src/stream/mod.rs b/rt/src/stream/mod.rs index d10f1932c..8280c0b19 100644 --- a/rt/src/stream/mod.rs +++ b/rt/src/stream/mod.rs @@ -121,16 +121,6 @@ impl protocol::Writer for Stream { fn pending(&self) -> usize { self.buf.len() } - #[inline] - fn write(&mut self, data: &[u8]) -> protocol::Result<()> { - if data.len() <= 4 { - self.buf.write(data); - } else { - let mut ctx = Context::from_waker(&NOOP); - let _ = Pin::new(self).poll_write(&mut ctx, data); - } - Ok(()) - } // hint: 提示可能优先写入到cache #[inline] fn cache(&mut self, hint: bool) { @@ -147,22 +137,24 @@ impl protocol::Writer for Stream { self.rx_buf.pending() == 0 } } -impl ds::BufWriter for Stream { +use ds::{Range, Slicer}; +impl ds::Writer for Stream { #[inline] fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> { if data.len() <= 4 { self.buf.write(data); } else { let mut ctx = Context::from_waker(&NOOP); - let _ = Pin::new(self).poll_write(&mut ctx, data); + let _ = Pin::new(self).poll_write(&mut ctx, data)?; } Ok(()) } - #[inline] - fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> { - self.buf.enable = true; - self.write_all(buf0)?; - self.write_all(buf1) + #[inline(always)] + fn write_r(&mut self, r: R, slicer: &D) -> std::io::Result<()> { + slicer.with_seg(r, |data, _oft, seg| { + self.buf.enable |= seg; + self.write_all(data) + }) } } impl protocol::BufRead for Stream { diff --git a/stream/src/handler.rs b/stream/src/handler.rs index 91b113f92..4c6df3e75 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -175,6 +175,7 @@ where Poll::Pending => Ok(()), } } + #[inline(always)] fn poll_response(&mut self, cx: &mut Context) -> Poll> { while self.pending.len() > 0 { diff --git a/tests/src/all.rs b/tests/src/all.rs index fd47f77c2..70f9d0e3f 100644 --- a/tests/src/all.rs +++ b/tests/src/all.rs @@ -3,7 +3,7 @@ mod distribute; // mod hash_test; mod shard_test; //mod memcached_text; -//mod mem; +mod mem; mod protocols; //mod queue; // mod redis; diff --git a/tests/src/benches/ring_slice.rs b/tests/src/benches/ring_slice.rs index 19b67a630..0f5d8b67b 100644 --- a/tests/src/benches/ring_slice.rs +++ b/tests/src/benches/ring_slice.rs @@ -11,6 +11,7 @@ pub(super) fn bench_iter(c: &mut Criterion) { let len = slice.len(); let mut group = c.benchmark_group("ring_slice_iter"); let rs = RingSlice::from(slice.as_ptr(), slice.len(), 0, len); + let seg = ds::RingSlicer::from(slice.as_ptr(), slice.len(), 0, len); group.bench_function("vec", |b| { b.iter(|| { black_box({ @@ -83,6 +84,15 @@ pub(super) fn bench_iter(c: &mut Criterion) { }); }); }); + group.bench_function("fold_seg", |b| { + b.iter(|| { + black_box({ + seg.fold(0, 0u64, |t, v| { + *t += v as u64; + }) + }); + }); + }); group.bench_function("fold", |b| { b.iter(|| { black_box({ @@ -133,7 +143,7 @@ pub(super) fn bench_read_num(c: &mut Criterion) { let mut t = 0u64; for i in 0..runs { let mut buf = [0u8; 8]; - rs.copy_to_r(&mut buf[..], i..i + 8); + rs.copy_to_r(i..i + 8, &mut buf[..]); t = t.wrapping_add(u64::from_le_bytes(buf)); } t @@ -157,7 +167,7 @@ pub(super) fn bench_read_num(c: &mut Criterion) { let mut t = 0u64; for i in 0..runs { let mut buf = [0u8; 8]; - rs.copy_to_r(&mut buf[..], i..i + 8); + rs.copy_to_r(i..i + 8, &mut buf[..]); t = t.wrapping_add(u64::from_be_bytes(buf)); } t @@ -226,7 +236,7 @@ pub(super) fn bench_copy(c: &mut Criterion) { b.iter(|| { black_box({ for i in 0..runs { - rs.copy_to_r(&mut dst[..], i..i + 64); + rs.copy_to_r(i..i + 64, &mut dst[..]); } }); }); diff --git a/tests/src/mem.rs b/tests/src/mem.rs index 959725256..dc14713fc 100644 --- a/tests/src/mem.rs +++ b/tests/src/mem.rs @@ -1,156 +1,68 @@ -use ds::{RingBuffer, RingSlice}; - -fn rnd_bytes(size: usize) -> Vec { - let data: Vec = (0..size).map(|_| rand::random::()).collect(); - data -} +use ds::EphemeralVec; #[test] -fn ring_buffer() { - let cap = 32; - let data = rnd_bytes(cap); - let rs = RingSlice::from(data.as_ptr(), cap, 0, 17); - let mut buf = RingBuffer::with_capacity(cap); - buf.write(&rs); - assert_eq!(buf.len(), 17); - assert!(&(buf.data()) == &data[0..17]); - let rs = RingSlice::from(data.as_ptr(), cap, 32, 32 + 32); - buf.advance_read(buf.len()); - assert_eq!(buf.len(), 0); - let n = buf.write(&rs); - assert_eq!(buf.len(), n); - assert_eq!(&buf.data(), &data[..]); - buf.advance_read(rs.len()); - assert_eq!(buf.len(), 0); - // 有折返的 - let rs = RingSlice::from(data.as_ptr(), cap, 27, 27 + 19); - assert_eq!(rs.len(), 19); - let n = buf.write(&rs); - assert_eq!(n, rs.len()); - assert_eq!(buf.len(), rs.len()); - assert_eq!(buf.len(), 19); - assert_eq!(buf.data(), rs); - let rs = RingSlice::from(data.as_ptr(), cap, 32, 32 + 32); - let n = buf.write(&rs); - assert_eq!(n, 32 - 19); - - let mut rrb = ds::ResizedRingBuffer::from(256, 4 * 1024, 1024); - assert_eq!(1024, rrb.cap()); - assert_eq!(0, rrb.len()); - - // 一次写满 - let buf = rrb.as_mut_bytes(); - assert_eq!(buf.len(), 1024); - assert_eq!(rrb.len(), 0); - assert_eq!(rrb.cap(), 1024); - rrb.advance_write(1024); - assert_eq!(rrb.len(), 1024); - assert_eq!(rrb.cap(), 1024); - - // 没有了,触发扩容 - let buf = rrb.as_mut_bytes(); - assert_eq!(buf.len(), 1024); - assert_eq!(rrb.cap(), 1024 * 2); - assert_eq!(rrb.len(), 1024); - - rrb.advance_read(1024); - - rrb.advance_write(1024); - let buf = rrb.as_mut_bytes(); - assert_eq!(buf.len(), 1024); - rrb.advance_write(1024); +fn ephemera_vec() { + use ds::Buffer; + let mut v = EphemeralVec::fix_cap(1024); + v.write(b"abcdefg"); + assert_eq!(v.len(), 7); + assert_eq!(&*v, b"abcdefg"); + v.write(b"hijklmnop"); + assert_eq!(v.len(), 16); + assert_eq!(&*v, b"abcdefghijklmnop"); - // 等待10ms。(默认是4ms) - std::thread::sleep(ds::time::Duration::from_millis(10)); - let buf = rrb.as_mut_bytes(); - assert_eq!(buf.len(), 1024); - rrb.advance_write(1024); - let buf = rrb.as_mut_bytes(); - assert_eq!(buf.len(), 1024); + drop(v); - // 缩容 - assert_eq!(rrb.cap(), 4 * 1024); - rrb.advance_read(2 * 1024); - //rrb.resize(2 * 1024); - //assert_eq!(rrb.cap(), 2 * 1024); -} - -// 随机生成器,生成的内存从a-z, A-Z, 0-9 循环。 -struct Reader { - num: usize, - source: Vec, - offset: usize, -} -impl ds::BuffRead for Reader { - type Out = usize; - fn read(&mut self, b: &mut [u8]) -> (usize, Self::Out) { - assert!(self.num > 0); - let mut w = 0; - while w < self.num { - let oft = self.offset % self.source.len(); - let l = b.len().min(self.num - w); - use std::ptr::copy_nonoverlapping as copy; - unsafe { copy(self.source.as_ptr().offset(oft as isize), b.as_mut_ptr(), l) }; - w += l; - self.offset += l; - } - (w, w) + // 单线程顺序分配,立即释放 + let mut n = 0; + // 20M数据。Cache大小是8M,可以循环分配 + use rand::{Rng, RngCore}; + let rnd = &mut rand::thread_rng(); + while n < 16 << 20 { + let len = rnd.gen_range(1..16 * 1024); + let mut v = EphemeralVec::fix_cap(len); + // 随机填充 + let mut buf = vec![0; len]; + rnd.fill_bytes(&mut buf[..]); + v.write(&buf); + assert_eq!(v.len(), len); + assert_eq!(&*v, &buf[..]); + n += len; } } #[test] -fn guarded_buffer() { - // 测试MemGuard - //let data: Vec = "abcdefg".into(); - //let s: &[u8] = &data; - //let slice: RingSlice = s.into(); - //let g0: MemGuard = slice.into(); - //assert_eq!(g0.read(0), &data); - //// 指向同一块内存 - //assert_eq!(g0.read(0).as_ptr() as usize, data.as_ptr() as usize); - //g0.recall(); - //// 内存回收,共享内存被释放。数据被复制,指向不同的内存。 - //assert_ne!(g0.read(0).as_ptr() as usize, data.as_ptr() as usize); - //// 但是数据一致 - //assert_eq!(g0.read(0), &data); - ////assert_eq!(g0.len(), data.len()); - //drop(data); +#[ignore] +fn test_ephemera_vec_thread() { + let secs = 3600; + let threads = 4; + let ths: Vec<_> = (0..threads) + .map(|_| { + std::thread::spawn(move || { + let start = std::time::Instant::now(); + use ds::Buffer; + use rand::{Rng, RngCore}; + let rnd = &mut rand::thread_rng(); + // 创建1..10个随机大小的vec + while start.elapsed().as_secs() < secs { + let num = rnd.gen_range(1..10); + let mut vecs = Vec::with_capacity(num); + for _i in 0..num { + let len = rnd.gen_range(1..64 * 1024); + let mut v = EphemeralVec::fix_cap(len); + let mut buf = vec![0; len]; + rnd.fill_bytes(&mut buf); + v.write(&buf); + assert_eq!(v.len(), len); + assert_eq!(&*v, &buf); + vecs.push(v); + } + } + }) + }) + .collect(); - let mut reader = Reader { - offset: 0, - source: Vec::from("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), - num: 0, - }; - use ds::GuardedBuffer; - let mut guard = GuardedBuffer::new(128, 1024, 128); - let empty = guard.read(); - assert_eq!(empty.len(), 0); - reader.num = 24; - let n = guard.write(&mut reader); - assert_eq!(n, guard.len()); - assert_eq!(n, reader.num); - - let data = guard.read(); - assert_eq!(n, data.len()); - assert_eq!(data, reader.source[0..n]); - let len_g0 = 24; - let g0 = guard.take(len_g0); - assert_eq!(g0.len(), len_g0); - assert_eq!(guard.len(), n - len_g0); - let data = guard.read(); - assert_eq!(n - len_g0, data.len()); - //g0.read(0); - reader.num = 17; - guard.write(&mut reader); - let g1 = guard.take(10); - let g2 = guard.take(3); - let g3 = guard.take(3); - drop(g2); - drop(g1); - reader.num = 1; - guard.write(&mut reader); - drop(g3); - - drop(g0); - guard.gc(); + for th in ths { + th.join().expect("should not panic"); + } } diff --git a/tests/src/ring_buffer.rs b/tests/src/ring_buffer.rs index 813b4fdab..f15c3091e 100644 --- a/tests/src/ring_buffer.rs +++ b/tests/src/ring_buffer.rs @@ -38,6 +38,7 @@ fn ring_buffer_basic() { let n = buf.write(&rs); assert_eq!(buf.len(), rs.len()); assert_eq!(buf.len(), n); + println!("=============\n\n"); assert_eq!(&buf.data(), &data[..]); buf.consume(rs.len()); assert_eq!(buf.len(), 0); @@ -81,18 +82,18 @@ fn ring_buffer_basic() { } // 随机从reader读取数据 - let runs = 1000; - let mut reader = RandomReader(rnd_bytes(cap * 32)); - for i in 0..runs { - let writtened = buf.copy_from(&mut reader); - let len = writtened.len(); - let src = buf.data().slice(buf.len() - len, len); - assert_eq!(writtened, src, "{}-th", i); - // 随机消费n条数据 - let n = rand::random::() as usize % buf.len(); - buf.consume(n); - } - buf.consume(buf.len()); + //let runs = 1000; + //let mut reader = RandomReader(rnd_bytes(cap * 32)); + //for i in 0..runs { + // let writtened = buf.copy_from(&mut reader); + // let len = writtened.len(); + // let src = buf.data().slice(buf.len() - len, len); + // assert_eq!(writtened, src, "{}-th", i); + // // 随机消费n条数据 + // let n = rand::random::() as usize % buf.len(); + // buf.consume(n); + //} + //buf.consume(buf.len()); } #[test] diff --git a/tests/src/ring_slice.rs b/tests/src/ring_slice.rs index f984552a1..7e990f373 100644 --- a/tests/src/ring_slice.rs +++ b/tests/src/ring_slice.rs @@ -76,6 +76,17 @@ fn test_ring_slice() { let _ = unsafe { Vec::from_raw_parts(ptr, 0, cap) }; } +#[test] +fn find_lr_cr() { + const CAP: usize = 32; + let mut data = [0u8; CAP]; + use std::ptr::copy_nonoverlapping as copy; + let b = [43u8, 79, 75, 13, 10, 43, 79, 75, 13, 10]; + unsafe { copy(b.as_ptr(), data.as_mut_ptr(), b.len()) }; + let rs = RingSlice::from(data.as_ptr(), CAP, 5, 10); + let o = rs.find_lf_cr(0); + assert_eq!(o, Some(3)); +} #[test] fn test_read_number() { @@ -137,6 +148,18 @@ fn copy_to_vec() { slice.copy_to_vec(&mut data); assert_eq!(data, vec![0, 1, 2, 0, 1, 2]); } +#[test] +fn copy_to_vec_seg() { + let data = "0123"; + let slice = RingSlice::from(data.as_ptr(), 4, 2, 6); + + let mut out: Vec = Vec::new(); + slice.copy_to_vec(&mut out); + assert_eq!(&*out, "2301".as_bytes()); + + assert_eq!(&slice, &out[..]); + assert_ne!(&slice, "2300".as_bytes()); +} #[test] fn copy_to_slice() { @@ -144,7 +167,7 @@ fn copy_to_slice() { let slice = RingSlice::from_vec(&data); let mut slice_short = [0_u8; 2]; - slice.copy_to_w(0..2, &mut slice_short[..]); + slice.copy_to_r(0..2, &mut slice_short[..]); assert_eq!(slice_short, [0, 1]); let mut slice_long = [0_u8; 6]; @@ -157,7 +180,6 @@ fn copy_to_slice() { let ptr = raw.as_ptr(); let mut rng = rand::thread_rng(); let mut dst = Vec::with_capacity(cap); - unsafe { dst.set_len(cap) }; for _i in 0..100 { let (start, end) = match rng.gen_range(0..10) { 0 => (0, cap), @@ -194,8 +216,9 @@ fn copy_to_slice() { (r_start, r_len) } }; - rs.copy_to_r(&mut dst, r_start..r_start + r_len); + rs.copy_to_v(r_start..r_start + r_len, &mut dst); assert_eq!(&dst[0..r_len], &slice[r_start..r_start + r_len]); + dst.clear(); } } } @@ -329,7 +352,7 @@ fn fold() { if ascii { *acc = acc.wrapping_mul(10).wrapping_add((v - b'0') as u64); } - ascii + !ascii }); assert_eq!(num, 12345678); let start = data.len() - 1; // '9' @@ -340,7 +363,7 @@ fn fold() { if ascii { *acc = acc.wrapping_mul(10).wrapping_add((v - b'0') as u64); } - ascii + !ascii }); assert_eq!(num, 912345678); }