@@ -333,14 +333,31 @@ where
333333 let _t = StopWatch :: new ( & * ENGINE_READ_ENTRY_DURATION_HISTOGRAM ) ;
334334 if let Some ( memtable) = self . memtables . get ( region_id) {
335335 let mut ents_idx: Vec < EntryIndex > = Vec :: with_capacity ( ( end - begin) as usize ) ;
336- // Ensure that the corresponding memtable is locked with a read lock before
337- // completing the fetching of entries from the raft logs. This
338- // prevents the scenario where the index could become stale while
339- // being concurrently updated by the `rewrite` operation.
340- let immutable = memtable. read ( ) ;
341- immutable. fetch_entries_to ( begin, end, max_size, & mut ents_idx) ?;
336+ memtable
337+ . read ( )
338+ . fetch_entries_to ( begin, end, max_size, & mut ents_idx) ?;
342339 for i in ents_idx. iter ( ) {
343- vec. push ( read_entry_from_file :: < M , _ > ( self . pipe_log . as_ref ( ) , i) ?) ;
340+ vec. push ( {
341+ match read_entry_from_file :: < M , _ > ( self . pipe_log . as_ref ( ) , i) {
342+ Ok ( entry) => entry,
343+ Err ( e) => {
344+ // The index is not found in the file, it means the entry is already
345+ // stale by `compact` or `rewrite`. Retry to read the entry from the
346+ // memtable.
347+ let immutable = memtable. read ( ) ;
348+ // Ensure that the corresponding memtable is locked with a read lock
349+ // before completing the fetching of entries
350+ // from the raft logs. This prevents the
351+ // scenario where the index could become stale while
352+ // being concurrently updated by the `rewrite` operation.
353+ if let Some ( idx) = immutable. get_entry ( i. index ) {
354+ read_entry_from_file :: < M , _ > ( self . pipe_log . as_ref ( ) , & idx) ?
355+ } else {
356+ return Err ( e) ;
357+ }
358+ }
359+ }
360+ } ) ;
344361 }
345362 ENGINE_READ_ENTRY_COUNT_HISTOGRAM . observe ( ents_idx. len ( ) as f64 ) ;
346363 return Ok ( ents_idx. len ( ) ) ;
0 commit comments