33use std:: cell:: { Cell , RefCell } ;
44use std:: marker:: PhantomData ;
55use std:: path:: Path ;
6- use std:: sync:: { mpsc , Arc , Mutex } ;
6+ use std:: sync:: { Arc , Mutex , mpsc } ;
77use std:: thread:: { Builder as ThreadBuilder , JoinHandle } ;
88use std:: time:: { Duration , Instant } ;
99
1010use log:: { error, info} ;
11- use protobuf:: { parse_from_bytes , Message } ;
11+ use protobuf:: { Message , parse_from_bytes } ;
1212
1313use crate :: config:: { Config , RecoveryMode } ;
1414use crate :: consistency:: ConsistencyChecker ;
@@ -22,7 +22,7 @@ use crate::metrics::*;
2222use crate :: pipe_log:: { FileBlockHandle , FileId , LogQueue , PipeLog } ;
2323use crate :: purge:: { PurgeHook , PurgeManager } ;
2424use crate :: write_barrier:: { WriteBarrier , Writer } ;
25- use crate :: { perf_context , Error , GlobalStats , Result } ;
25+ use crate :: { Error , GlobalStats , Result , perf_context } ;
2626
2727const METRICS_FLUSH_INTERVAL : Duration = Duration :: from_secs ( 30 ) ;
2828/// Max times for `write`.
@@ -106,11 +106,13 @@ where
106106 let memtables_clone = memtables. clone ( ) ;
107107 let metrics_flusher = ThreadBuilder :: new ( )
108108 . name ( "re-metrics" . into ( ) )
109- . spawn ( move || loop {
110- stats_clone. flush_metrics ( ) ;
111- memtables_clone. flush_metrics ( ) ;
112- if rx. recv_timeout ( METRICS_FLUSH_INTERVAL ) . is_ok ( ) {
113- break ;
109+ . spawn ( move || {
110+ loop {
111+ stats_clone. flush_metrics ( ) ;
112+ memtables_clone. flush_metrics ( ) ;
113+ if rx. recv_timeout ( METRICS_FLUSH_INTERVAL ) . is_ok ( ) {
114+ break ;
115+ }
114116 }
115117 } ) ?;
116118
@@ -648,14 +650,14 @@ where
648650pub ( crate ) mod tests {
649651 use super :: * ;
650652 use crate :: env:: { ObfuscatedFileSystem , Permission } ;
651- use crate :: file_pipe_log:: { parse_reserved_file_name , FileNameExt } ;
653+ use crate :: file_pipe_log:: { FileNameExt , parse_reserved_file_name } ;
652654 use crate :: log_batch:: AtomicGroupBuilder ;
653655 use crate :: pipe_log:: Version ;
654- use crate :: test_util:: { generate_entries , PanicGuard } ;
656+ use crate :: test_util:: { PanicGuard , generate_entries } ;
655657 use crate :: util:: ReadableSize ;
656658 use kvproto:: raft_serverpb:: RaftLocalState ;
657659 use raft:: eraftpb:: Entry ;
658- use rand:: { thread_rng , Rng } ;
660+ use rand:: { Rng , thread_rng } ;
659661 use std:: collections:: { BTreeSet , HashSet } ;
660662 use std:: fs:: OpenOptions ;
661663 use std:: path:: PathBuf ;
@@ -1231,9 +1233,11 @@ pub(crate) mod tests {
12311233 // GC all log entries. Won't trigger purge because total size is not enough.
12321234 let count = engine. compact_to ( 1 , 100 ) ;
12331235 assert_eq ! ( count, 100 ) ;
1234- assert ! ( !engine
1235- . purge_manager
1236- . needs_rewrite_log_files( LogQueue :: Append ) ) ;
1236+ assert ! (
1237+ !engine
1238+ . purge_manager
1239+ . needs_rewrite_log_files( LogQueue :: Append )
1240+ ) ;
12371241
12381242 // Append more logs to make total size greater than `purge_threshold`.
12391243 for index in 100 ..250 {
@@ -1243,9 +1247,11 @@ pub(crate) mod tests {
12431247 // GC first 101 log entries.
12441248 assert_eq ! ( engine. compact_to( 1 , 101 ) , 1 ) ;
12451249 // Needs to purge because the total size is greater than `purge_threshold`.
1246- assert ! ( engine
1247- . purge_manager
1248- . needs_rewrite_log_files( LogQueue :: Append ) ) ;
1250+ assert ! (
1251+ engine
1252+ . purge_manager
1253+ . needs_rewrite_log_files( LogQueue :: Append )
1254+ ) ;
12491255
12501256 let old_min_file_seq = engine. file_span ( LogQueue :: Append ) . 0 ;
12511257 let will_force_compact = engine. purge_expired_files ( ) . unwrap ( ) ;
@@ -1259,9 +1265,11 @@ pub(crate) mod tests {
12591265
12601266 assert_eq ! ( engine. compact_to( 1 , 102 ) , 1 ) ;
12611267 // Needs to purge because the total size is greater than `purge_threshold`.
1262- assert ! ( engine
1263- . purge_manager
1264- . needs_rewrite_log_files( LogQueue :: Append ) ) ;
1268+ assert ! (
1269+ engine
1270+ . purge_manager
1271+ . needs_rewrite_log_files( LogQueue :: Append )
1272+ ) ;
12651273 let will_force_compact = engine. purge_expired_files ( ) . unwrap ( ) ;
12661274 // The region needs to be force compacted because the threshold is reached.
12671275 assert ! ( !will_force_compact. is_empty( ) ) ;
@@ -1350,9 +1358,11 @@ pub(crate) mod tests {
13501358 engine. append ( 11 , 1 , 11 , Some ( & data) ) ;
13511359
13521360 // The engine needs purge, and all old entries should be rewritten.
1353- assert ! ( engine
1354- . purge_manager
1355- . needs_rewrite_log_files( LogQueue :: Append ) ) ;
1361+ assert ! (
1362+ engine
1363+ . purge_manager
1364+ . needs_rewrite_log_files( LogQueue :: Append )
1365+ ) ;
13561366 assert ! ( engine. purge_expired_files( ) . unwrap( ) . is_empty( ) ) ;
13571367 assert ! ( engine. file_span( LogQueue :: Append ) . 0 > 1 ) ;
13581368
@@ -1386,9 +1396,11 @@ pub(crate) mod tests {
13861396 }
13871397 }
13881398
1389- assert ! ( engine
1390- . purge_manager
1391- . needs_rewrite_log_files( LogQueue :: Append ) ) ;
1399+ assert ! (
1400+ engine
1401+ . purge_manager
1402+ . needs_rewrite_log_files( LogQueue :: Append )
1403+ ) ;
13921404 assert ! ( engine. purge_expired_files( ) . unwrap( ) . is_empty( ) ) ;
13931405 }
13941406
@@ -1410,7 +1422,7 @@ pub(crate) mod tests {
14101422 let empty_entry = Entry :: new ( ) ;
14111423 assert_eq ! ( empty_entry. compute_size( ) , 0 ) ;
14121424 log_batch
1413- . add_entries :: < Entry > ( 0 , & [ empty_entry. clone ( ) ] )
1425+ . add_entries :: < Entry > ( 0 , std :: slice :: from_ref ( & empty_entry) )
14141426 . unwrap ( ) ;
14151427 engine. write ( & mut log_batch, false ) . unwrap ( ) ;
14161428 let empty_state = RaftLocalState :: new ( ) ;
@@ -1420,7 +1432,7 @@ pub(crate) mod tests {
14201432 . unwrap ( ) ;
14211433 engine. write ( & mut log_batch, false ) . unwrap ( ) ;
14221434 log_batch
1423- . add_entries :: < Entry > ( 2 , & [ empty_entry. clone ( ) ] )
1435+ . add_entries :: < Entry > ( 2 , std :: slice :: from_ref ( & empty_entry) )
14241436 . unwrap ( ) ;
14251437 log_batch
14261438 . put_message ( 2 , b"key" . to_vec ( ) , & empty_state)
0 commit comments