Skip to content

Commit 392f5e6

Browse files
authored
Unlock the LOCK files on drop (#379)
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
1 parent 7e12385 commit 392f5e6

File tree

15 files changed

+60
-38
lines changed

15 files changed

+60
-38
lines changed

.github/workflows/rust.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
uses: actions-rs/toolchain@v1
3131
with:
3232
profile: minimal
33-
toolchain: nightly-2023-12-31
33+
toolchain: nightly-2025-04-03
3434
override: true
3535
components: rustfmt, clippy, rust-src
3636
- uses: Swatinem/rust-cache@v1
@@ -65,7 +65,7 @@ jobs:
6565
uses: actions-rs/toolchain@v1
6666
with:
6767
profile: minimal
68-
toolchain: 1.75.0
68+
toolchain: 1.85.0
6969
override: true
7070
components: rustfmt, clippy, rust-src
7171
- uses: Swatinem/rust-cache@v1
@@ -92,7 +92,7 @@ jobs:
9292
uses: actions-rs/toolchain@v1
9393
with:
9494
profile: minimal
95-
toolchain: nightly-2023-12-31
95+
toolchain: nightly-2025-04-03
9696
override: true
9797
components: llvm-tools-preview
9898
- uses: Swatinem/rust-cache@v1

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "raft-engine"
33
version = "0.4.2"
44
authors = ["The TiKV Project Developers"]
55
edition = "2018"
6-
rust-version = "1.75.0"
6+
rust-version = "1.85.0"
77
description = "A persistent storage engine for Multi-Raft logs"
88
readme = "README.md"
99
repository = "https://github.com/tikv/raft-engine"

src/codec.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ mod tests {
360360
use super::*;
361361
use protobuf::CodedOutputStream;
362362
use std::io::ErrorKind;
363-
use std::{f32, f64, i16, i32, i64, u16, u32, u64};
364363

365364
const U16_TESTS: &[u16] = &[
366365
i16::MIN as u16,

src/env/default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl Read for LogFile {
6262
impl Seek for LogFile {
6363
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
6464
fail_point!("log_file::seek::err", |_| {
65-
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
65+
Err(std::io::Error::other("fp"))
6666
});
6767
match pos {
6868
SeekFrom::Start(offset) => self.offset = offset as usize,

src/file_pipe_log/log_file.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,12 @@ impl<F: FileSystem> LogFileWriter<F> {
107107
}
108108
self.capacity += alloc;
109109
}
110-
self.writer.write_all(buf).map_err(|e| {
110+
self.writer.write_all(buf).inspect_err(|_| {
111111
self.writer
112112
.seek(SeekFrom::Start(self.written as u64))
113113
.unwrap_or_else(|e| {
114114
panic!("failed to reseek after write failure: {}", e);
115115
});
116-
e
117116
})?;
118117
self.written = new_written;
119118
Ok(())

src/file_pipe_log/pipe.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77

88
use crossbeam::utils::CachePadded;
99
use fail::fail_point;
10+
use fs2::FileExt;
1011
use log::error;
1112
use parking_lot::{Mutex, MutexGuard, RwLock};
1213

@@ -464,7 +465,17 @@ impl<F: FileSystem> SinglePipe<F> {
464465
pub struct DualPipes<F: FileSystem> {
465466
pipes: [SinglePipe<F>; 2],
466467

467-
_dir_locks: Vec<StdFile>,
468+
dir_locks: Vec<StdFile>,
469+
}
470+
471+
impl<F: FileSystem> Drop for DualPipes<F> {
472+
fn drop(&mut self) {
473+
for lock in &self.dir_locks {
474+
if let Err(e) = FileExt::unlock(lock) {
475+
error!("error while unlocking directory: {e}");
476+
}
477+
}
478+
}
468479
}
469480

470481
impl<F: FileSystem> DualPipes<F> {
@@ -481,7 +492,7 @@ impl<F: FileSystem> DualPipes<F> {
481492

482493
Ok(Self {
483494
pipes: [appender, rewriter],
484-
_dir_locks: dir_locks,
495+
dir_locks,
485496
})
486497
}
487498

@@ -727,4 +738,21 @@ mod tests {
727738
assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1));
728739
}
729740
}
741+
742+
#[test]
743+
fn test_release_on_drop() {
744+
let dir = Builder::new()
745+
.prefix("test_release_on_drop")
746+
.tempdir()
747+
.unwrap();
748+
let path = dir.path().to_str().unwrap();
749+
let cfg = Config {
750+
dir: path.to_owned(),
751+
target_file_size: ReadableSize(1),
752+
..Default::default()
753+
};
754+
let pipe_log = new_test_pipes(&cfg).unwrap();
755+
drop(pipe_log);
756+
assert!(new_test_pipes(&cfg).is_ok());
757+
}
730758
}

src/file_pipe_log/pipe_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
379379
fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>(
380380
file_system: Arc<F>,
381381
recovery_cfg: RecoveryConfig,
382-
files: &mut Vec<File<F>>,
382+
files: &mut [File<F>],
383383
machine_factory: &FA,
384384
) -> Result<M> {
385385
if recovery_cfg.concurrency == 0 || files.is_empty() {
@@ -390,7 +390,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
390390
let recovery_mode = recovery_cfg.mode;
391391
let recovery_read_block_size = recovery_cfg.read_block_size as usize;
392392

393-
let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1);
393+
let max_chunk_size = files.len().div_ceil(concurrency);
394394
let chunks = files.par_chunks_mut(max_chunk_size);
395395
let chunk_count = chunks.len();
396396
debug_assert!(chunk_count <= concurrency);

src/log_batch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
use std::fmt::Debug;
44
use std::io::BufRead;
5+
use std::mem;
56
use std::sync::atomic::{AtomicU64, Ordering};
67
use std::sync::Arc;
7-
use std::{mem, u64};
88

99
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
1010
use log::error;
@@ -53,7 +53,7 @@ pub enum CompressionType {
5353
impl CompressionType {
5454
pub fn from_u8(t: u8) -> Result<Self> {
5555
if t <= CompressionType::Lz4 as u8 {
56-
Ok(unsafe { mem::transmute(t) })
56+
Ok(unsafe { mem::transmute::<u8, Self>(t) })
5757
} else {
5858
Err(Error::Corruption(format!(
5959
"Unrecognized compression type: {t}"
@@ -168,7 +168,7 @@ pub enum OpType {
168168
impl OpType {
169169
pub fn from_u8(t: u8) -> Result<Self> {
170170
if t <= OpType::Del as u8 {
171-
Ok(unsafe { mem::transmute(t) })
171+
Ok(unsafe { mem::transmute::<u8, Self>(t) })
172172
} else {
173173
Err(Error::Corruption(format!("Unrecognized op type: {t}")))
174174
}

src/memtable.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ impl<A: AllocatorTrait> MemTable<A> {
228228
}
229229

230230
if let Some(g) = rhs.atomic_group.take() {
231-
assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0));
231+
assert!(self.atomic_group.is_none_or(|(_, end)| end <= g.0));
232232
self.atomic_group = Some(g);
233233
}
234234

@@ -545,7 +545,7 @@ impl<A: AllocatorTrait> MemTable<A> {
545545
}
546546

547547
pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
548-
assert!(self.atomic_group.map_or(true, |(_, b)| b <= start));
548+
assert!(self.atomic_group.is_none_or(|(_, b)| b <= start));
549549
self.atomic_group = Some((start, end));
550550
}
551551

@@ -763,7 +763,7 @@ impl<A: AllocatorTrait> MemTable<A> {
763763
debug_assert!(count > 0);
764764
self.entry_indexes
765765
.get(count - 1)
766-
.map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
766+
.is_some_and(|ei| ei.entries.unwrap().id.seq <= gate.seq)
767767
}
768768

769769
/// Returns the region ID.

src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub trait TimeMetric {
122122
}
123123
}
124124

125-
impl<'a> TimeMetric for &'a Histogram {
125+
impl TimeMetric for &Histogram {
126126
fn observe(&self, duration: Duration) {
127127
Histogram::observe(self, duration.as_secs_f64());
128128
}

0 commit comments

Comments
 (0)