Skip to content

Commit 3bffa63

Browse files
authored
Merge pull request #16142 from amazon-mq/lukebakken/cq-gc
Revert CQ shared store: Delete from index on remove or roll over (#13959)
2 parents 10e8924 + 69fd9ff commit 3bffa63

1 file changed

Lines changed: 18 additions & 50 deletions

File tree

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 18 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@
8080
current_file_handle,
8181
%% current write file offset
8282
current_file_offset,
83-
%% messages that were potentially removed from the current write file
84-
current_file_removes = [],
8583
%% TRef for our interval timer
8684
sync_timer_ref,
8785
%% files that had removes
@@ -1166,11 +1164,7 @@ write_message(MsgId, Msg, CRef,
11661164
end, CRef, State1)
11671165
end.
11681166

1169-
remove_message(MsgId, CRef,
1170-
State = #msstate{
1171-
index_ets = IndexEts,
1172-
current_file = CurrentFile,
1173-
current_file_removes = Removes }) ->
1167+
remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
11741168
case should_mask_action(CRef, MsgId, State) of
11751169
{true, _Location} ->
11761170
State;
@@ -1182,32 +1176,20 @@ remove_message(MsgId, CRef,
11821176
%% ets:lookup(FileSummaryEts, File),
11831177
State;
11841178
{_Mask, #msg_location { ref_count = RefCount, file = File,
1185-
total_size = TotalSize } = Entry}
1179+
total_size = TotalSize }}
11861180
when RefCount > 0 ->
11871181
%% only update field, otherwise bad interaction with
11881182
%% concurrent GC
1183+
ok = index_update_ref_counter(IndexEts, MsgId, -1),
11891184
case RefCount of
11901185
%% Don't remove from cur_file_cache_ets here because
11911186
%% there may be further writes in the mailbox for the
1192-
%% same msg. We will remove 0 ref_counts when rolling
1193-
%% over to the next write file.
1194-
1 when File =:= CurrentFile ->
1195-
index_update_ref_counter(IndexEts, MsgId, -1),
1196-
State1 = State#msstate{current_file_removes =
1197-
[Entry#msg_location{ref_count=0}|Removes]},
1198-
delete_file_if_empty(
1199-
File, gc_candidate(File,
1200-
adjust_valid_total_size(
1201-
File, -TotalSize, State1)));
1202-
1 ->
1203-
index_delete(IndexEts, MsgId),
1204-
delete_file_if_empty(
1205-
File, gc_candidate(File,
1206-
adjust_valid_total_size(
1207-
File, -TotalSize, State)));
1208-
_ ->
1209-
index_update_ref_counter(IndexEts, MsgId, -1),
1210-
gc_candidate(File, State)
1187+
%% same msg.
1188+
1 -> delete_file_if_empty(
1189+
File, gc_candidate(File,
1190+
adjust_valid_total_size(
1191+
File, -TotalSize, State)));
1192+
_ -> gc_candidate(File, State)
12111193
end
12121194
end.
12131195

@@ -1269,9 +1251,7 @@ flush_or_roll_to_new_file(
12691251
cur_file_cache_ets = CurFileCacheEts,
12701252
file_size_limit = FileSizeLimit })
12711253
when Offset >= FileSizeLimit ->
1272-
%% Cleanup the index of messages that were removed before rolling over.
1273-
State0 = cleanup_index_on_roll_over(State),
1274-
State1 = internal_sync(State0),
1254+
State1 = internal_sync(State),
12751255
ok = writer_close(CurHdl),
12761256
NextFile = CurFile + 1,
12771257
{ok, NextHdl} = writer_open(Dir, NextFile),
@@ -1299,8 +1279,6 @@ write_large_message(MsgId, MsgBodyBin,
12991279
index_ets = IndexEts,
13001280
file_summary_ets = FileSummaryEts,
13011281
cur_file_cache_ets = CurFileCacheEts }) ->
1302-
%% Cleanup the index of messages that were removed before rolling over.
1303-
State1 = cleanup_index_on_roll_over(State0),
13041282
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
13051283
%% We haven't written in the file yet. Use it.
13061284
0 ->
@@ -1320,13 +1298,13 @@ write_large_message(MsgId, MsgBodyBin,
13201298
ok = index_insert(IndexEts,
13211299
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
13221300
offset = 0, total_size = TotalSize }),
1323-
State2 = case CurFile of
1301+
State1 = case CurFile of
13241302
%% We didn't open a new file. We must update the existing value.
13251303
LargeMsgFile ->
13261304
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
13271305
[{#file_summary.valid_total_size, TotalSize},
13281306
{#file_summary.file_size, TotalSize}]),
1329-
State1;
1307+
State0;
13301308
%% We opened a new file. We can insert it all at once.
13311309
%% We must also check whether we need to delete the previous
13321310
%% current file, because if there is no valid data this is
@@ -1337,7 +1315,7 @@ write_large_message(MsgId, MsgBodyBin,
13371315
valid_total_size = TotalSize,
13381316
file_size = TotalSize,
13391317
locked = false }),
1340-
delete_file_if_empty(CurFile, State1 #msstate { current_file_handle = LargeMsgHdl,
1318+
delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl,
13411319
current_file = LargeMsgFile,
13421320
current_file_offset = TotalSize })
13431321
end,
@@ -1352,22 +1330,11 @@ write_large_message(MsgId, MsgBodyBin,
13521330
%% Delete messages from the cache that were written to disk.
13531331
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
13541332
%% Process confirms (this won't flush; we already did) and continue.
1355-
State = internal_sync(State2),
1333+
State = internal_sync(State1),
13561334
State #msstate { current_file_handle = NextHdl,
13571335
current_file = NextFile,
13581336
current_file_offset = 0 }.
13591337

1360-
cleanup_index_on_roll_over(State = #msstate{
1361-
index_ets = IndexEts,
1362-
current_file_removes = Removes}) ->
1363-
lists:foreach(fun(Entry) ->
1364-
%% We delete objects that have ref_count=0. If a message
1365-
%% got its ref_count increased, it will not be deleted.
1366-
%% We thus avoid extra index lookups to check for ref_count.
1367-
index_delete_object(IndexEts, Entry)
1368-
end, Removes),
1369-
State#msstate{current_file_removes=[]}.
1370-
13711338
contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) ->
13721339
MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId),
13731340
gen_server2:reply(From, MsgLocation =/= not_found),
@@ -2178,9 +2145,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
21782145

21792146
-spec delete_file(non_neg_integer(), gc_state()) -> ok | defer.
21802147

2181-
delete_file(File, #gc_state { file_summary_ets = FileSummaryEts,
2182-
file_handles_ets = FileHandlesEts,
2183-
dir = Dir }) ->
2148+
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
2149+
file_handles_ets = FileHandlesEts,
2150+
dir = Dir }) ->
21842151
case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
21852152
{[_|_], _Cont} ->
21862153
?LOG_DEBUG("Asked to delete file ~p but it has active readers. Deferring.",
@@ -2189,6 +2156,7 @@ delete_file(File, #gc_state { file_summary_ets = FileSummaryEts,
21892156
_ ->
21902157
[#file_summary{ valid_total_size = 0,
21912158
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
2159+
[] = scan_and_vacuum_message_file(File, State),
21922160
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
21932161
true = ets:delete(FileSummaryEts, File),
21942162
?LOG_DEBUG("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),

0 commit comments

Comments
 (0)