Skip to content

Commit 2a548bf

Browse files
committed
rabbit_msg_store: stop GC with exit(shutdown) during shutdown
When `rabbit_msg_store` shuts down, its `terminate/2` callback calls `rabbit_msg_store_gc:stop/1`, which sends a `gen_server2:call(stop, infinity)`. If the GC process is blocked mid-`handle_cast` on disk I/O (for example, during compaction under disk pressure), the call blocks indefinitely. `terminate/2` never reaches the code that writes the recovery files (`file_summary.ets`, `msg_store_index.ets`, `clean.dot`). Eventually the msg_store child's supervisor shutdown timeout expires (`msg_store_shutdown_timeout`, default 600s) and the supervisor kills the msg_store process with reason `killed`. With no recovery files on disk, the next startup rebuilds indices from scratch by scanning every segment file, which is slow and expensive for large stores. This was observed in production on a broker under PerfTest load. The persistent message store for the `/` vhost logged "Stopping message store" and then nothing for 10 minutes until the supervisor killed it. The GC process was blocked on disk I/O while disk free space was hovering near the 2 GiB limit. On restart the store logged "rebuilding indices from scratch" despite the shutdown having been initiated gracefully via `rabbitmqctl stop`. Replace the synchronous `rabbit_msg_store_gc:stop/1` call in `terminate/2` with a new `stop_gc/1` helper that monitors the GC, sends `exit(GCPid, shutdown)`, and waits for the `'DOWN'` message. `rabbit_msg_store_gc` does not trap exits, so an exit signal terminates it immediately even if it is running inside a `handle_cast` callback on disk I/O. Exit signals are processed by the scheduler, not the user-level receive loop, so the signal preempts the blocked callback. The GC's `terminate/2` is a no-op, so bypassing it has no side effect. The wait for `'DOWN'` is bounded by `max(msg_store_shutdown_timeout - 60_000, 5_000)` so that `terminate/2` stays within the msg_store child's own supervisor shutdown timeout and leaves at least 60s for the remaining steps (syncing the current file, writing the file summary, tearing down ETS, writing recovery terms). If the shutdown signal does not produce a `'DOWN'` in time, fall back to `exit(GCPid, kill)`. `kill` cannot be trapped, so the inner `receive` has no timeout. Killing the GC mid-operation is safe with respect to message data: - `compact_file` copies messages before updating the index, and the original data remains on disk until truncation. The code comments confirm: "it's OK if we crash at any point before we update the index because the old data is still there until we truncate." - `truncate_file` only removes data that has already been compacted to earlier offsets. - `delete_file` only deletes files with zero valid messages, enforced by assertions before the delete. The unclean recovery path (`build_index/3`) rebuilds everything from the actual segment files on disk using `scan_file_for_valid_messages`, so any inconsistency between the file summary and the on-disk state is handled. In the common case (GC killed before it modified the file summary ETS), the recovery files are fully consistent and the next startup recovers cleanly without a rebuild. Add `rabbit_msg_store:gc_pid/1` to expose the GC pid for testing. Add two test cases to `backing_queue_SUITE`: - `msg_store_gc_stuck_suspended` suspends the GC with `sys:suspend`, terminates the persistent store via the supervisor, and verifies that the store recovers cleanly (`successfully_recovered_state` returns `true`) with all messages intact. This covers the case where the GC is blocked in its receive loop. - `msg_store_gc_stuck_mid_callback` mocks `compact_file/2` to block indefinitely inside the `handle_cast` callback, sends a compact cast to the GC to put it in the blocking callback, then terminates the store via the supervisor. This covers the scenario that motivated the PR: a GC stuck on disk I/O inside a callback. Both tests confirm that `exit(GcPid, shutdown)` terminates the GC process regardless of what Erlang code it is running, that `terminate/2` completes and writes the recovery files, and that the next startup recovers cleanly.
1 parent 6072dbd commit 2a548bf

2 files changed

Lines changed: 174 additions & 3 deletions

File tree

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-behaviour(gen_server2).
1111

1212
-export([start_link/5, successfully_recovered_state/1,
13+
gc_pid/1,
1314
client_init/3, client_terminate/1, client_delete_and_terminate/1,
1415
client_pre_hibernate/1, client_ref/1,
1516
write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]).
@@ -399,6 +400,11 @@ start_link(VHost, Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
399400
successfully_recovered_state(Server) ->
400401
gen_server2:call(Server, successfully_recovered_state, infinity).
401402

403+
-spec gc_pid(server()) -> pid().
404+
405+
gc_pid(Server) ->
406+
gen_server2:call(Server, gc_pid, infinity).
407+
402408
-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().
403409

404410
client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
@@ -806,6 +812,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
806812
prioritise_call(Msg, _From, _Len, _State) ->
807813
case Msg of
808814
successfully_recovered_state -> 7;
815+
gc_pid -> 7;
809816
{new_client_state, _Ref, _Pid, _MODC} -> 7;
810817
_ -> 0
811818
end.
@@ -826,6 +833,9 @@ prioritise_info(Msg, _Len, _State) ->
826833
handle_call(successfully_recovered_state, _From, State) ->
827834
reply(State #msstate.successfully_recovered, State);
828835

836+
handle_call(gc_pid, _From, State) ->
837+
reply(State #msstate.gc_pid, State);
838+
829839
handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
830840
State = #msstate { dir = Dir,
831841
index_ets = IndexEts,
@@ -971,9 +981,13 @@ terminate(Reason, State = #msstate { index_ets = IndexEts,
971981
_ -> {" with reason ~0p", [Reason]}
972982
end,
973983
?LOG_INFO("Stopping message store for directory '~ts'" ++ ExtraLog, [Dir|ExtraLogArgs]),
974-
%% stop the gc first, otherwise it could be working and we pull
975-
%% out the ets tables from under it.
976-
ok = rabbit_msg_store_gc:stop(GCPid),
984+
%% Terminate the GC first, otherwise it could still be running and we
985+
%% pull the ETS tables out from under it. The GC does not trap exits,
986+
%% so an exit signal terminates it immediately even if it is stuck
987+
%% mid-callback on disk I/O. Bound the wait so terminate stays within
988+
%% the msg_store child's own supervisor shutdown timeout, and fall
989+
%% back to kill if the shutdown signal does not take effect in time.
990+
stop_gc(GCPid),
977991
State3 = case CurHdl of
978992
undefined -> State;
979993
_ -> State2 = internal_sync(State),
@@ -1008,6 +1022,21 @@ code_change(_OldVsn, State, _Extra) ->
10081022

10091023
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
10101024

1025+
stop_gc(GCPid) ->
1026+
ShutdownTimeout = rabbit_misc:get_env(
1027+
rabbit, msg_store_shutdown_timeout, 600_000),
1028+
Timeout = max(ShutdownTimeout - 60_000, 5_000),
1029+
MRef = erlang:monitor(process, GCPid),
1030+
exit(GCPid, shutdown),
1031+
receive
1032+
{'DOWN', MRef, process, GCPid, _} -> ok
1033+
after Timeout ->
1034+
exit(GCPid, kill),
1035+
receive
1036+
{'DOWN', MRef, process, GCPid, _} -> ok
1037+
end
1038+
end.
1039+
10111040
%%----------------------------------------------------------------------------
10121041
%% general helper functions
10131042
%%----------------------------------------------------------------------------

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ groups() ->
6464
msg_store,
6565
msg_store_read_many_fanout,
6666
msg_store_file_scan,
67+
msg_store_gc_stuck_suspended,
68+
msg_store_gc_stuck_mid_callback,
6769
{backing_queue_v2, [], Common ++ V2Only}
6870
]}
6971
].
@@ -718,6 +720,146 @@ msg_store_file_scan1(Config) ->
718720
gen_id() ->
719721
rand:bytes(16).
720722

723+
%% Test that when the GC process is unresponsive during shutdown,
724+
%% the msg_store recovers cleanly because terminate sends the GC an
725+
%% exit signal and proceeds to write recovery files.
726+
msg_store_gc_stuck_suspended(Config) ->
727+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
728+
?MODULE, msg_store_gc_stuck_suspended1, [Config]).
729+
730+
msg_store_gc_stuck_suspended1(_Config) ->
731+
GenRef = fun() -> make_ref() end,
732+
restart_msg_store_empty(),
733+
734+
%% Write some messages so the store has data to recover.
735+
Ref = rabbit_guid:gen(),
736+
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
737+
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
738+
ok = msg_store_write(MsgIds, MSCState),
739+
ok = rabbit_msg_store:client_terminate(MSCState),
740+
741+
%% Get the msg_store pid and its GC pid.
742+
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
743+
?VHOST, ?PERSISTENT_MSG_STORE),
744+
GCPid = rabbit_msg_store:gc_pid(StorePid),
745+
true = is_process_alive(GCPid),
746+
747+
%% Suspend the GC process so it cannot process messages.
748+
ok = sys:suspend(GCPid),
749+
750+
%% Stop the transient store cleanly first.
751+
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),
752+
753+
%% Terminate the persistent store via the supervisor. The terminate
754+
%% callback sends the GC an exit signal. The GC does not trap exits
755+
%% so it terminates immediately, and terminate proceeds to write
756+
%% recovery files.
757+
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
758+
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),
759+
760+
%% Delete the child specs so we can restart.
761+
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),
762+
763+
%% Restart the msg_store and check recovery state.
764+
ok = rabbit_variable_queue:start_msg_store(
765+
?VHOST, [Ref], {fun ([]) -> finished end, []}),
766+
767+
%% The store should report a clean recovery because the fix
768+
%% terminates the unresponsive GC and proceeds to write recovery files.
769+
true = rabbit_vhost_msg_store:successfully_recovered_state(
770+
?VHOST, ?PERSISTENT_MSG_STORE),
771+
772+
%% Verify all messages survived the unclean GC shutdown.
773+
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
774+
true = msg_store_contains(true, MsgIds, MSCState2),
775+
ok = rabbit_msg_store:client_terminate(MSCState2),
776+
777+
%% Clean up.
778+
restart_msg_store_empty(),
779+
passed.
780+
781+
%% Test that when the GC process is blocked mid-callback (simulating disk I/O),
782+
%% the msg_store recovers cleanly because terminate sends the GC an exit
783+
%% signal and proceeds to write recovery files.
784+
msg_store_gc_stuck_mid_callback(Config) ->
785+
rabbit_ct_broker_helpers:setup_meck(Config),
786+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
787+
?MODULE, msg_store_gc_stuck_mid_callback1, [Config]).
788+
789+
msg_store_gc_stuck_mid_callback1(_Config) ->
790+
GenRef = fun() -> make_ref() end,
791+
restart_msg_store_empty(),
792+
793+
%% Write some messages so the store has data to recover.
794+
Ref = rabbit_guid:gen(),
795+
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
796+
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
797+
ok = msg_store_write(MsgIds, MSCState),
798+
ok = rabbit_msg_store:client_terminate(MSCState),
799+
800+
%% Get the msg_store pid and its GC pid.
801+
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
802+
?VHOST, ?PERSISTENT_MSG_STORE),
803+
GCPid = rabbit_msg_store:gc_pid(StorePid),
804+
true = is_process_alive(GCPid),
805+
806+
%% Mock compact_file to signal the test process on entry, then block
807+
%% indefinitely, simulating a GC process stuck on disk I/O mid-callback.
808+
TestPid = self(),
809+
ok = meck:new(rabbit_msg_store, [no_link, passthrough]),
810+
ok = meck:expect(rabbit_msg_store, compact_file,
811+
fun(_, _) ->
812+
TestPid ! gc_in_callback,
813+
%% Block forever with no CPU usage, simulating a
814+
%% process stuck waiting on disk I/O that never
815+
%% completes. The GC will be terminated by stop_gc/1.
816+
receive after infinity -> ok end
817+
end),
818+
819+
%% Send a compact cast directly to the GC. It will enter the mocked
820+
%% compact_file, signal us, then block inside the handle_cast callback.
821+
rabbit_msg_store_gc:compact(GCPid, 0),
822+
823+
%% Wait for the GC to confirm it has entered the blocking callback.
824+
receive
825+
gc_in_callback -> ok
826+
after 5000 ->
827+
error(gc_did_not_enter_callback)
828+
end,
829+
830+
%% Stop the transient store cleanly first.
831+
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),
832+
833+
%% Terminate the persistent store via the supervisor. The GC is blocked
834+
%% mid-callback but the exit signal sent by terminate preempts the
835+
%% callback because the GC does not trap exits, so terminate proceeds
836+
%% to write recovery files.
837+
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
838+
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),
839+
840+
ok = meck:unload(rabbit_msg_store),
841+
842+
%% Delete the child spec so we can restart.
843+
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),
844+
845+
%% Restart the msg_store and check recovery state.
846+
ok = rabbit_variable_queue:start_msg_store(
847+
?VHOST, [Ref], {fun ([]) -> finished end, []}),
848+
849+
%% The store should report a clean recovery because the fix terminates
850+
%% the unresponsive GC and proceeds to write recovery files.
851+
true = rabbit_vhost_msg_store:successfully_recovered_state(
852+
?VHOST, ?PERSISTENT_MSG_STORE),
853+
854+
%% Verify all messages survived the unclean GC shutdown.
855+
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
856+
true = msg_store_contains(true, MsgIds, MSCState2),
857+
ok = rabbit_msg_store:client_terminate(MSCState2),
858+
859+
%% Clean up.
860+
restart_msg_store_empty(),
861+
passed.
862+
721863
gen_msg() ->
722864
gen_msg(1024 * 1024).
723865

0 commit comments

Comments
 (0)