@@ -334,9 +334,12 @@ where
334334 let _t = StopWatch :: new ( & * ENGINE_READ_ENTRY_DURATION_HISTOGRAM ) ;
335335 if let Some ( memtable) = self . memtables . get ( region_id) {
336336 let mut ents_idx: Vec < EntryIndex > = Vec :: with_capacity ( ( end - begin) as usize ) ;
337- memtable
338- . read ( )
339- . fetch_entries_to ( begin, end, max_size, & mut ents_idx) ?;
337+ // Ensure that the corresponding memtable is locked with a read lock before
338+ // completing the fetching of entries from the raft logs. This
339+ // prevents the scenario where the index could become stale while
340+ // being concurrently updated by the `rewrite` operation.
341+ let immutable = memtable. read ( ) ;
342+ immutable. fetch_entries_to ( begin, end, max_size, & mut ents_idx) ?;
340343 for i in ents_idx. iter ( ) {
341344 vec. push ( read_entry_from_file :: < M , _ > ( self . pipe_log . as_ref ( ) , i) ?) ;
342345 }
@@ -635,9 +638,11 @@ pub(crate) mod tests {
635638 use crate :: util:: ReadableSize ;
636639 use kvproto:: raft_serverpb:: RaftLocalState ;
637640 use raft:: eraftpb:: Entry ;
641+ use rand:: { thread_rng, Rng } ;
638642 use std:: collections:: { BTreeSet , HashSet } ;
639643 use std:: fs:: OpenOptions ;
640644 use std:: path:: PathBuf ;
645+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
641646
642647 pub ( crate ) type RaftLogEngine < F = DefaultFileSystem > = Engine < F > ;
643648 impl < F : FileSystem > RaftLogEngine < F > {
@@ -1929,8 +1934,6 @@ pub(crate) mod tests {
19291934 #[ cfg( feature = "nightly" ) ]
19301935 #[ bench]
19311936 fn bench_engine_fetch_entries ( b : & mut test:: Bencher ) {
1932- use rand:: { thread_rng, Rng } ;
1933-
19341937 let dir = tempfile:: Builder :: new ( )
19351938 . prefix ( "bench_engine_fetch_entries" )
19361939 . tempdir ( )
@@ -2587,6 +2590,53 @@ pub(crate) mod tests {
25872590 assert ! ( data. is_empty( ) , "data loss {:?}" , data) ;
25882591 }
25892592
2593+ #[ test]
2594+ fn test_fetch_with_concurrently_rewrite ( ) {
2595+ let dir = tempfile:: Builder :: new ( )
2596+ . prefix ( "test_fetch_with_concurrently_rewrite" )
2597+ . tempdir ( )
2598+ . unwrap ( ) ;
2599+ let cfg = Config {
2600+ dir : dir. path ( ) . to_str ( ) . unwrap ( ) . to_owned ( ) ,
2601+ target_file_size : ReadableSize ( 2048 ) ,
2602+ ..Default :: default ( )
2603+ } ;
2604+ let fs = Arc :: new ( DeleteMonitoredFileSystem :: new ( ) ) ;
2605+ let engine = Arc :: new ( RaftLogEngine :: open_with_file_system ( cfg, fs. clone ( ) ) . unwrap ( ) ) ;
2606+ let entry_data = vec ! [ b'x' ; 128 ] ;
2607+ // Set up a concurrent write with purge, and fetch.
2608+ let mut vec: Vec < Entry > = Vec :: new ( ) ;
2609+ let fetch_engine = engine. clone ( ) ;
2610+ let flag = Arc :: new ( AtomicBool :: new ( false ) ) ;
2611+ let start_flag = flag. clone ( ) ;
2612+ let th = std:: thread:: spawn ( move || {
2613+ while !start_flag. load ( Ordering :: Acquire ) {
2614+ std:: thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
2615+ }
2616+ for _ in 0 ..10 {
2617+ let region_id = thread_rng ( ) . gen_range ( 1 ..=10 ) ;
2618+ // Should not return file seqno out of range error.
2619+ let _ = fetch_engine
2620+ . fetch_entries_to :: < Entry > ( region_id, 1 , 101 , None , & mut vec)
2621+ . map_err ( |e| {
2622+ assert ! ( !format!( "{e}" ) . contains( "file seqno out of" ) ) ;
2623+ } ) ;
2624+ vec. clear ( ) ;
2625+ }
2626+ } ) ;
2627+ for i in 0 ..10 {
2628+ for rid in 1 ..=10 {
2629+ engine. append ( rid, 1 + i * 10 , 1 + i * 10 + 10 , Some ( & entry_data) ) ;
2630+ }
2631+ flag. store ( true , Ordering :: Release ) ;
2632+ for rid in 1 ..=10 {
2633+ engine. clean ( rid) ;
2634+ }
2635+ engine. purge_expired_files ( ) . unwrap ( ) ;
2636+ }
2637+ th. join ( ) . unwrap ( ) ;
2638+ }
2639+
25902640 #[ test]
25912641 fn test_internal_key_filter ( ) {
25922642 let dir = tempfile:: Builder :: new ( )
0 commit comments