Skip to content

Commit e5e2b4e

Browse files
authored
Release dir locks on engine closing (#381)
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
1 parent 7e12385 commit e5e2b4e

File tree

12 files changed

+53
-27
lines changed

12 files changed

+53
-27
lines changed

.github/workflows/rust.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
uses: actions-rs/toolchain@v1
3131
with:
3232
profile: minimal
33-
toolchain: nightly-2023-12-31
33+
toolchain: nightly-2025-04-03
3434
override: true
3535
components: rustfmt, clippy, rust-src
3636
- uses: Swatinem/rust-cache@v1
@@ -65,7 +65,7 @@ jobs:
6565
uses: actions-rs/toolchain@v1
6666
with:
6767
profile: minimal
68-
toolchain: 1.75.0
68+
toolchain: 1.85.0
6969
override: true
7070
components: rustfmt, clippy, rust-src
7171
- uses: Swatinem/rust-cache@v1
@@ -92,7 +92,7 @@ jobs:
9292
uses: actions-rs/toolchain@v1
9393
with:
9494
profile: minimal
95-
toolchain: nightly-2023-12-31
95+
toolchain: nightly-2025-04-03
9696
override: true
9797
components: llvm-tools-preview
9898
- uses: Swatinem/rust-cache@v1

src/codec.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ mod tests {
360360
use super::*;
361361
use protobuf::CodedOutputStream;
362362
use std::io::ErrorKind;
363-
use std::{f32, f64, i16, i32, i64, u16, u32, u64};
364363

365364
const U16_TESTS: &[u16] = &[
366365
i16::MIN as u16,

src/env/default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl Read for LogFile {
6262
impl Seek for LogFile {
6363
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
6464
fail_point!("log_file::seek::err", |_| {
65-
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
65+
Err(std::io::Error::other("fp"))
6666
});
6767
match pos {
6868
SeekFrom::Start(offset) => self.offset = offset as usize,

src/file_pipe_log/pipe.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77

88
use crossbeam::utils::CachePadded;
99
use fail::fail_point;
10+
use fs2::FileExt;
1011
use log::error;
1112
use parking_lot::{Mutex, MutexGuard, RwLock};
1213

@@ -464,7 +465,17 @@ impl<F: FileSystem> SinglePipe<F> {
464465
pub struct DualPipes<F: FileSystem> {
465466
pipes: [SinglePipe<F>; 2],
466467

467-
_dir_locks: Vec<StdFile>,
468+
dir_locks: Vec<StdFile>,
469+
}
470+
471+
impl<F: FileSystem> Drop for DualPipes<F> {
472+
fn drop(&mut self) {
473+
for lock in &self.dir_locks {
474+
if let Err(e) = FileExt::unlock(lock) {
475+
error!("error while unlocking directory: {e}");
476+
}
477+
}
478+
}
468479
}
469480

470481
impl<F: FileSystem> DualPipes<F> {
@@ -481,7 +492,7 @@ impl<F: FileSystem> DualPipes<F> {
481492

482493
Ok(Self {
483494
pipes: [appender, rewriter],
484-
_dir_locks: dir_locks,
495+
dir_locks,
485496
})
486497
}
487498

@@ -727,4 +738,21 @@ mod tests {
727738
assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1));
728739
}
729740
}
741+
742+
#[test]
743+
fn test_release_on_drop() {
744+
let dir = Builder::new()
745+
.prefix("test_release_on_drop")
746+
.tempdir()
747+
.unwrap();
748+
let path = dir.path().to_str().unwrap();
749+
let cfg = Config {
750+
dir: path.to_owned(),
751+
target_file_size: ReadableSize(1),
752+
..Default::default()
753+
};
754+
let pipe_log = new_test_pipes(&cfg).unwrap();
755+
drop(pipe_log);
756+
assert!(new_test_pipes(&cfg).is_ok());
757+
}
730758
}

src/file_pipe_log/pipe_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
379379
fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>(
380380
file_system: Arc<F>,
381381
recovery_cfg: RecoveryConfig,
382-
files: &mut Vec<File<F>>,
382+
files: &mut [File<F>],
383383
machine_factory: &FA,
384384
) -> Result<M> {
385385
if recovery_cfg.concurrency == 0 || files.is_empty() {
@@ -390,7 +390,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
390390
let recovery_mode = recovery_cfg.mode;
391391
let recovery_read_block_size = recovery_cfg.read_block_size as usize;
392392

393-
let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1);
393+
let max_chunk_size = files.len().div_ceil(concurrency);
394394
let chunks = files.par_chunks_mut(max_chunk_size);
395395
let chunk_count = chunks.len();
396396
debug_assert!(chunk_count <= concurrency);

src/log_batch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
use std::fmt::Debug;
44
use std::io::BufRead;
5+
use std::mem;
56
use std::sync::atomic::{AtomicU64, Ordering};
67
use std::sync::Arc;
7-
use std::{mem, u64};
88

99
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
1010
use log::error;
@@ -53,7 +53,7 @@ pub enum CompressionType {
5353
impl CompressionType {
5454
pub fn from_u8(t: u8) -> Result<Self> {
5555
if t <= CompressionType::Lz4 as u8 {
56-
Ok(unsafe { mem::transmute(t) })
56+
Ok(unsafe { mem::transmute::<u8, Self>(t) })
5757
} else {
5858
Err(Error::Corruption(format!(
5959
"Unrecognized compression type: {t}"
@@ -168,7 +168,7 @@ pub enum OpType {
168168
impl OpType {
169169
pub fn from_u8(t: u8) -> Result<Self> {
170170
if t <= OpType::Del as u8 {
171-
Ok(unsafe { mem::transmute(t) })
171+
Ok(unsafe { mem::transmute::<u8, Self>(t) })
172172
} else {
173173
Err(Error::Corruption(format!("Unrecognized op type: {t}")))
174174
}

src/memtable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ impl<A: AllocatorTrait> MemTable<A> {
763763
debug_assert!(count > 0);
764764
self.entry_indexes
765765
.get(count - 1)
766-
.map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
766+
.is_some_and(|ei| ei.entries.unwrap().id.seq <= gate.seq)
767767
}
768768

769769
/// Returns the region ID.

src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub trait TimeMetric {
122122
}
123123
}
124124

125-
impl<'a> TimeMetric for &'a Histogram {
125+
impl TimeMetric for &Histogram {
126126
fn observe(&self, duration: Duration) {
127127
Histogram::observe(self, duration.as_secs_f64());
128128
}

src/test_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ where
6363
}
6464

6565
pub struct PanicGuard {
66-
prev_hook: *mut (dyn Fn(&panic::PanicInfo<'_>) + Sync + Send + 'static),
66+
prev_hook: *mut (dyn Fn(&panic::PanicHookInfo<'_>) + Sync + Send + 'static),
6767
}
6868

6969
struct PointerHolder<T: ?Sized>(*mut T);

src/util.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl<'de> Deserialize<'de> for ReadableSize {
148148
{
149149
struct SizeVisitor;
150150

151-
impl<'de> Visitor<'de> for SizeVisitor {
151+
impl Visitor<'_> for SizeVisitor {
152152
type Value = ReadableSize;
153153

154154
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -221,7 +221,7 @@ pub fn unhash_u64(mut i: u64) -> u64 {
221221

222222
pub mod lz4 {
223223
use crate::{Error, Result};
224-
use std::{i32, ptr};
224+
use std::ptr;
225225

226226
pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;
227227

@@ -330,7 +330,7 @@ pub trait Factory<Target>: Send + Sync {
330330
/// ```
331331
#[inline]
332332
pub fn round_up(offset: usize, alignment: usize) -> usize {
333-
(offset + alignment - 1) / alignment * alignment
333+
offset.div_ceil(alignment) * alignment
334334
}
335335

336336
/// Returns an aligned `offset`.

0 commit comments

Comments
 (0)