Skip to content

Commit ce9b1a2

Browse files
committed
make cache local instead of global
Global cache is wrong if there are multiple raft engine or raft engine is reopen multiple times. Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
1 parent 5ef0467 commit ce9b1a2

File tree

3 files changed

+55
-49
lines changed

3 files changed

+55
-49
lines changed

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ pub struct Config {
108108
///
109109
/// Default: false
110110
pub prefill_for_recycle: bool,
111+
112+
/// Initial cache capacity for entries.
113+
pub cache_capacity: ReadableSize,
111114
}
112115

113116
impl Default for Config {
@@ -129,6 +132,7 @@ impl Default for Config {
129132
memory_limit: None,
130133
enable_log_recycle: false,
131134
prefill_for_recycle: false,
135+
cache_capacity: ReadableSize::mb(256),
132136
};
133137
// Test-specific configurations.
134138
#[cfg(test)]

src/engine.rs

Lines changed: 46 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
use std::marker::PhantomData;
44
use std::path::Path;
5-
use std::sync::{mpsc, Arc, Mutex};
5+
use std::sync::{mpsc, Arc};
66
use std::thread::{Builder as ThreadBuilder, JoinHandle};
77
use std::time::{Duration, Instant};
88

99
use bytes::Bytes;
1010
use log::{error, info};
11+
use parking_lot::Mutex;
1112
use protobuf::{parse_from_bytes, Message};
1213

1314
use crate::cache::LruCache;
@@ -98,6 +99,7 @@ where
9899
cfg.clone(),
99100
memtables.clone(),
100101
pipe_log.clone(),
102+
Mutex::new(LruCache::with_capacity(cfg.cache_capacity.0 as usize)),
101103
stats.clone(),
102104
listeners.clone(),
103105
);
@@ -307,6 +309,7 @@ where
307309
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0);
308310
return Ok(Some(read_entry_from_file::<M, _>(
309311
self.pipe_log.as_ref(),
312+
&self.purge_manager.cache,
310313
&idx,
311314
)?));
312315
}
@@ -336,7 +339,11 @@ where
336339
.read()
337340
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
338341
for i in ents_idx.iter() {
339-
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
342+
vec.push(read_entry_from_file::<M, _>(
343+
self.pipe_log.as_ref(),
344+
&self.purge_manager.cache,
345+
i,
346+
)?);
340347
}
341348
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
342349
return Ok(ents_idx.len());
@@ -415,7 +422,7 @@ where
415422
P: PipeLog,
416423
{
417424
fn drop(&mut self) {
418-
self.tx.lock().unwrap().send(()).unwrap();
425+
self.tx.lock().send(()).unwrap();
419426
if let Some(t) = self.metrics_flusher.take() {
420427
t.join().unwrap();
421428
}
@@ -549,54 +556,55 @@ where
549556
}
550557

551558
pub fn resize_internal_cache(&self, new_capacity: usize) {
552-
CACHE.lock().unwrap().resize(new_capacity);
559+
self.purge_manager.cache.lock().resize(new_capacity);
553560
}
554561
}
555562

556-
lazy_static::lazy_static! {
557-
/// Use 256MiB by default.
558-
static ref CACHE: Mutex<LruCache> = Mutex::new(LruCache::with_capacity(256 * 1024 * 1024));
559-
}
560-
561-
fn load_cache<P>(pipe_log: &P, idx: &EntryIndex) -> Result<Bytes>
563+
#[inline]
564+
fn load_cache<P>(pipe_log: &P, cache: &Mutex<LruCache>, idx: &EntryIndex) -> Result<Bytes>
562565
where
563566
P: PipeLog,
564567
{
565-
let mut cache = CACHE.lock().unwrap();
566-
if let Some(v) = cache.get(&idx.entries.unwrap()) {
568+
let mut guard = cache.lock();
569+
if let Some(v) = guard.get(&idx.entries.unwrap()) {
567570
return Ok(v);
568571
}
569-
drop(cache);
572+
drop(guard);
570573
let v = LogBatch::decode_entries_block(
571574
&pipe_log.read_bytes(idx.entries.unwrap())?,
572575
idx.entries.unwrap(),
573576
idx.compression_type,
574577
)?;
575-
CACHE
576-
.lock()
577-
.unwrap()
578-
.insert(idx.entries.unwrap(), v.clone());
578+
cache.lock().insert(idx.entries.unwrap(), v.clone());
579579
Ok(v)
580580
}
581581

582-
pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
582+
pub(crate) fn read_entry_from_file<M, P>(
583+
pipe_log: &P,
584+
cache: &Mutex<LruCache>,
585+
idx: &EntryIndex,
586+
) -> Result<M::Entry>
583587
where
584588
M: MessageExt,
585589
P: PipeLog,
586590
{
587-
let cache = load_cache(pipe_log, idx)?;
591+
let cache = load_cache(pipe_log, cache, idx)?;
588592
let e = parse_from_bytes(
589593
&cache[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
590594
)?;
591595
assert_eq!(M::index(&e), idx.index);
592596
Ok(e)
593597
}
594598

595-
pub(crate) fn read_entry_bytes_from_file<P>(pipe_log: &P, idx: &EntryIndex) -> Result<Bytes>
599+
pub(crate) fn read_entry_bytes_from_file<P>(
600+
pipe_log: &P,
601+
cache: &Mutex<LruCache>,
602+
idx: &EntryIndex,
603+
) -> Result<Bytes>
596604
where
597605
P: PipeLog,
598606
{
599-
let cache = load_cache(pipe_log, idx)?;
607+
let cache = load_cache(pipe_log, cache, idx)?;
600608
Ok(cache.slice(idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize))
601609
}
602610

@@ -650,7 +658,6 @@ pub(crate) mod tests {
650658
}
651659

652660
fn reopen(self) -> Self {
653-
super::CACHE.lock().unwrap().clear();
654661
let cfg: Config = self.cfg.as_ref().clone();
655662
let file_system = self.pipe_log.file_system();
656663
let mut listeners = self.listeners.clone();
@@ -1984,16 +1991,16 @@ pub(crate) mod tests {
19841991
match (parse_append, parse_recycled) {
19851992
(Some(id), None) if id.queue == LogQueue::Append => {
19861993
if delete {
1987-
self.append_metadata.lock().unwrap().remove(&id.seq)
1994+
self.append_metadata.lock().remove(&id.seq)
19881995
} else {
1989-
self.append_metadata.lock().unwrap().insert(id.seq)
1996+
self.append_metadata.lock().insert(id.seq)
19901997
}
19911998
}
19921999
(None, Some(seq)) => {
19932000
if delete {
1994-
self.recycled_metadata.lock().unwrap().remove(&seq)
2001+
self.recycled_metadata.lock().remove(&seq)
19952002
} else {
1996-
self.recycled_metadata.lock().unwrap().insert(seq)
2003+
self.recycled_metadata.lock().insert(seq)
19972004
}
19982005
}
19992006
_ => false,
@@ -2053,9 +2060,9 @@ pub(crate) mod tests {
20532060
let parse_recycled = parse_recycled_file_name(path);
20542061
match (parse_append, parse_recycled) {
20552062
(Some(id), None) if id.queue == LogQueue::Append => {
2056-
self.append_metadata.lock().unwrap().contains(&id.seq)
2063+
self.append_metadata.lock().contains(&id.seq)
20572064
}
2058-
(None, Some(seq)) => self.recycled_metadata.lock().unwrap().contains(&seq),
2065+
(None, Some(seq)) => self.recycled_metadata.lock().contains(&seq),
20592066
_ => false,
20602067
}
20612068
}
@@ -2099,29 +2106,20 @@ pub(crate) mod tests {
20992106
assert_eq!(engine.file_count(None), fs.inner.file_count());
21002107
let start = engine.file_span(LogQueue::Append).0;
21012108
// metadata have been deleted.
2102-
assert_eq!(
2103-
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2104-
&start
2105-
);
2109+
assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start);
21062110

21072111
let engine = engine.reopen();
21082112
assert_eq!(engine.file_count(None), fs.inner.file_count());
21092113
let (start, _) = engine.file_span(LogQueue::Append);
2110-
assert_eq!(
2111-
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2112-
&start
2113-
);
2114+
assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start);
21142115

21152116
// Simulate recycled metadata.
21162117
for i in start / 2..start {
2117-
fs.append_metadata.lock().unwrap().insert(i);
2118+
fs.append_metadata.lock().insert(i);
21182119
}
21192120
let engine = engine.reopen();
21202121
let (start, _) = engine.file_span(LogQueue::Append);
2121-
assert_eq!(
2122-
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2123-
&start
2124-
);
2122+
assert_eq!(fs.append_metadata.lock().iter().next().unwrap(), &start);
21252123
}
21262124

21272125
#[test]
@@ -2142,7 +2140,7 @@ pub(crate) mod tests {
21422140
};
21432141
let fs = Arc::new(DeleteMonitoredFileSystem::new());
21442142
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
2145-
let recycled_start = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
2143+
let recycled_start = *fs.recycled_metadata.lock().iter().next().unwrap();
21462144
for rid in 1..=10 {
21472145
engine.append(rid, 1, 11, Some(&entry_data));
21482146
}
@@ -2159,28 +2157,28 @@ pub(crate) mod tests {
21592157
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
21602158
// Recycled files have been reused.
21612159
assert_eq!(
2162-
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2160+
fs.append_metadata.lock().iter().next().unwrap(),
21632161
&(start + 20)
21642162
);
2165-
let recycled_start_1 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
2163+
let recycled_start_1 = *fs.recycled_metadata.lock().iter().next().unwrap();
21662164
assert!(recycled_start < recycled_start_1);
21672165
// Reuse these files.
21682166
for rid in 1..=5 {
21692167
engine.append(rid, 1, 11, Some(&entry_data));
21702168
}
2171-
let start_1 = *fs.append_metadata.lock().unwrap().iter().next().unwrap();
2169+
let start_1 = *fs.append_metadata.lock().iter().next().unwrap();
21722170
assert!(start <= start_1);
2173-
let recycled_start_2 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
2171+
let recycled_start_2 = *fs.recycled_metadata.lock().iter().next().unwrap();
21742172
assert!(recycled_start_1 < recycled_start_2);
21752173

21762174
// Reopen the engine and validate the recycled files are reserved
21772175
let file_count = fs.inner.file_count();
21782176
let engine = engine.reopen();
21792177
assert_eq!(file_count, fs.inner.file_count());
21802178
assert!(file_count > engine.file_count(None));
2181-
let start_2 = *fs.append_metadata.lock().unwrap().iter().next().unwrap();
2179+
let start_2 = *fs.append_metadata.lock().iter().next().unwrap();
21822180
assert_eq!(start_1, start_2);
2183-
let recycled_start_3 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
2181+
let recycled_start_3 = *fs.recycled_metadata.lock().iter().next().unwrap();
21842182
assert_eq!(recycled_start_2, recycled_start_3);
21852183
}
21862184

src/purge.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use fail::fail_point;
1010
use log::{info, warn};
1111
use parking_lot::{Mutex, RwLock};
1212

13+
use crate::cache::LruCache;
1314
use crate::config::Config;
1415
use crate::engine::read_entry_bytes_from_file;
1516
use crate::event_listener::EventListener;
@@ -42,6 +43,7 @@ where
4243
cfg: Arc<Config>,
4344
memtables: MemTables,
4445
pipe_log: Arc<P>,
46+
pub(crate) cache: Mutex<LruCache>,
4547
global_stats: Arc<GlobalStats>,
4648
listeners: Vec<Arc<dyn EventListener>>,
4749

@@ -60,13 +62,15 @@ where
6062
cfg: Arc<Config>,
6163
memtables: MemTables,
6264
pipe_log: Arc<P>,
65+
cache: Mutex<LruCache>,
6366
global_stats: Arc<GlobalStats>,
6467
listeners: Vec<Arc<dyn EventListener>>,
6568
) -> PurgeManager<P> {
6669
PurgeManager {
6770
cfg,
6871
memtables,
6972
pipe_log,
73+
cache,
7074
global_stats,
7175
listeners,
7276
force_rewrite_candidates: Arc::new(Mutex::new(HashMap::default())),
@@ -342,7 +346,7 @@ where
342346
// compression overhead is not too high.
343347
let mut entry_indexes = entry_indexes.into_iter().peekable();
344348
while let Some(ei) = entry_indexes.next() {
345-
let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &ei)?;
349+
let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &self.cache, &ei)?;
346350
current_size += entry.len();
347351
current_entries.push(entry);
348352
current_entry_indexes.push(ei);

0 commit comments

Comments
 (0)