Skip to content

Commit ee8ece2

Browse files
authored
Merge pull request #15498 from amazon-mq/fix/msg-store-gc-stop-timeout
`rabbit_msg_store`: terminate GC with exit signal during shutdown
2 parents 12a0f29 + 91ec75f commit ee8ece2

2 files changed

Lines changed: 177 additions & 3 deletions

File tree

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-behaviour(gen_server2).
1111

1212
-export([start_link/5, successfully_recovered_state/1,
13+
gc_pid/1,
1314
client_init/3, client_terminate/1, client_delete_and_terminate/1,
1415
client_pre_hibernate/1, client_ref/1,
1516
write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]).
@@ -399,6 +400,11 @@ start_link(VHost, Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
399400
successfully_recovered_state(Server) ->
400401
gen_server2:call(Server, successfully_recovered_state, infinity).
401402

403+
-spec gc_pid(server()) -> pid().
404+
405+
gc_pid(Server) ->
406+
gen_server2:call(Server, gc_pid, infinity).
407+
402408
-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().
403409

404410
client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
@@ -806,6 +812,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
806812
prioritise_call(Msg, _From, _Len, _State) ->
807813
case Msg of
808814
successfully_recovered_state -> 7;
815+
gc_pid -> 7;
809816
{new_client_state, _Ref, _Pid, _MODC} -> 7;
810817
_ -> 0
811818
end.
@@ -826,6 +833,9 @@ prioritise_info(Msg, _Len, _State) ->
826833
handle_call(successfully_recovered_state, _From, State) ->
827834
reply(State #msstate.successfully_recovered, State);
828835

836+
handle_call(gc_pid, _From, State) ->
837+
reply(State #msstate.gc_pid, State);
838+
829839
handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
830840
State = #msstate { dir = Dir,
831841
index_ets = IndexEts,
@@ -971,9 +981,13 @@ terminate(Reason, State = #msstate { index_ets = IndexEts,
971981
_ -> {" with reason ~0p", [Reason]}
972982
end,
973983
?LOG_INFO("Stopping message store for directory '~ts'" ++ ExtraLog, [Dir|ExtraLogArgs]),
974-
%% stop the gc first, otherwise it could be working and we pull
975-
%% out the ets tables from under it.
976-
ok = rabbit_msg_store_gc:stop(GCPid),
984+
%% Terminate the GC first, otherwise it could still be running and we
985+
%% pull the ETS tables out from under it. The GC does not trap exits,
986+
%% so an exit signal terminates it immediately even if it is stuck
987+
%% mid-callback on disk I/O. Bound the wait so terminate stays within
988+
%% the msg_store child's own supervisor shutdown timeout, and fall
989+
%% back to kill if the shutdown signal does not take effect in time.
990+
stop_gc(GCPid, Dir),
977991
State3 = case CurHdl of
978992
undefined -> State;
979993
_ -> State2 = internal_sync(State),
@@ -1008,6 +1022,24 @@ code_change(_OldVsn, State, _Extra) ->
10081022

10091023
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
10101024

1025+
stop_gc(GCPid, Dir) ->
1026+
ShutdownTimeout = rabbit_misc:get_env(
1027+
rabbit, msg_store_shutdown_timeout, 600_000),
1028+
Timeout = max(ShutdownTimeout - 60_000, 5_000),
1029+
MRef = erlang:monitor(process, GCPid),
1030+
exit(GCPid, shutdown),
1031+
receive
1032+
{'DOWN', MRef, process, GCPid, _} -> ok
1033+
after Timeout ->
1034+
?LOG_WARNING("Message store GC for directory '~ts' did not exit "
1035+
"within ~bms of shutdown signal, killing it",
1036+
[Dir, Timeout]),
1037+
exit(GCPid, kill),
1038+
receive
1039+
{'DOWN', MRef, process, GCPid, _} -> ok
1040+
end
1041+
end.
1042+
10111043
%%----------------------------------------------------------------------------
10121044
%% general helper functions
10131045
%%----------------------------------------------------------------------------

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ groups() ->
6464
msg_store,
6565
msg_store_read_many_fanout,
6666
msg_store_file_scan,
67+
msg_store_gc_stuck_suspended,
68+
msg_store_gc_stuck_mid_callback,
6769
{backing_queue_v2, [], Common ++ V2Only}
6870
]}
6971
].
@@ -718,6 +720,146 @@ msg_store_file_scan1(Config) ->
718720
gen_id() ->
719721
rand:bytes(16).
720722

723+
%% Test that when the GC process is unresponsive during shutdown,
724+
%% the msg_store recovers cleanly because terminate sends the GC an
725+
%% exit signal and proceeds to write recovery files.
726+
msg_store_gc_stuck_suspended(Config) ->
727+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
728+
?MODULE, msg_store_gc_stuck_suspended1, [Config]).
729+
730+
msg_store_gc_stuck_suspended1(_Config) ->
731+
GenRef = fun() -> make_ref() end,
732+
restart_msg_store_empty(),
733+
734+
%% Write some messages so the store has data to recover.
735+
Ref = rabbit_guid:gen(),
736+
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
737+
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
738+
ok = msg_store_write(MsgIds, MSCState),
739+
ok = rabbit_msg_store:client_terminate(MSCState),
740+
741+
%% Get the msg_store pid and its GC pid.
742+
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
743+
?VHOST, ?PERSISTENT_MSG_STORE),
744+
GCPid = rabbit_msg_store:gc_pid(StorePid),
745+
true = is_process_alive(GCPid),
746+
747+
%% Suspend the GC process so it cannot process messages.
748+
ok = sys:suspend(GCPid),
749+
750+
%% Stop the transient store cleanly first.
751+
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),
752+
753+
%% Terminate the persistent store via the supervisor. The terminate
754+
%% callback sends the GC an exit signal. The GC does not trap exits
755+
%% so it terminates immediately, and terminate proceeds to write
756+
%% recovery files.
757+
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
758+
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),
759+
760+
%% Delete the child specs so we can restart.
761+
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),
762+
763+
%% Restart the msg_store and check recovery state.
764+
ok = rabbit_variable_queue:start_msg_store(
765+
?VHOST, [Ref], {fun ([]) -> finished end, []}),
766+
767+
%% The store should report a clean recovery because the fix
768+
%% terminates the unresponsive GC and proceeds to write recovery files.
769+
true = rabbit_vhost_msg_store:successfully_recovered_state(
770+
?VHOST, ?PERSISTENT_MSG_STORE),
771+
772+
%% Verify all messages survived the unclean GC shutdown.
773+
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
774+
true = msg_store_contains(true, MsgIds, MSCState2),
775+
ok = rabbit_msg_store:client_terminate(MSCState2),
776+
777+
%% Clean up.
778+
restart_msg_store_empty(),
779+
passed.
780+
781+
%% Test that when the GC process is blocked mid-callback (simulating disk I/O),
782+
%% the msg_store recovers cleanly because terminate sends the GC an exit
783+
%% signal and proceeds to write recovery files.
784+
msg_store_gc_stuck_mid_callback(Config) ->
785+
rabbit_ct_broker_helpers:setup_meck(Config),
786+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
787+
?MODULE, msg_store_gc_stuck_mid_callback1, [Config]).
788+
789+
msg_store_gc_stuck_mid_callback1(_Config) ->
790+
GenRef = fun() -> make_ref() end,
791+
restart_msg_store_empty(),
792+
793+
%% Write some messages so the store has data to recover.
794+
Ref = rabbit_guid:gen(),
795+
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
796+
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
797+
ok = msg_store_write(MsgIds, MSCState),
798+
ok = rabbit_msg_store:client_terminate(MSCState),
799+
800+
%% Get the msg_store pid and its GC pid.
801+
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
802+
?VHOST, ?PERSISTENT_MSG_STORE),
803+
GCPid = rabbit_msg_store:gc_pid(StorePid),
804+
true = is_process_alive(GCPid),
805+
806+
%% Mock compact_file to signal the test process on entry, then block
807+
%% indefinitely, simulating a GC process stuck on disk I/O mid-callback.
808+
TestPid = self(),
809+
ok = meck:new(rabbit_msg_store, [no_link, passthrough]),
810+
ok = meck:expect(rabbit_msg_store, compact_file,
811+
fun(_, _) ->
812+
TestPid ! gc_in_callback,
813+
%% Block forever with no CPU usage, simulating a
814+
%% process stuck waiting on disk I/O that never
815+
%% completes. The GC will be terminated by stop_gc/1.
816+
receive after infinity -> ok end
817+
end),
818+
819+
%% Send a compact cast directly to the GC. It will enter the mocked
820+
%% compact_file, signal us, then block inside the handle_cast callback.
821+
rabbit_msg_store_gc:compact(GCPid, 0),
822+
823+
%% Wait for the GC to confirm it has entered the blocking callback.
824+
receive
825+
gc_in_callback -> ok
826+
after 5000 ->
827+
error(gc_did_not_enter_callback)
828+
end,
829+
830+
%% Stop the transient store cleanly first.
831+
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),
832+
833+
%% Terminate the persistent store via the supervisor. The GC is blocked
834+
%% mid-callback but the exit signal sent by terminate preempts the
835+
%% callback because the GC does not trap exits, so terminate proceeds
836+
%% to write recovery files.
837+
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
838+
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),
839+
840+
ok = meck:unload(rabbit_msg_store),
841+
842+
%% Delete the child spec so we can restart.
843+
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),
844+
845+
%% Restart the msg_store and check recovery state.
846+
ok = rabbit_variable_queue:start_msg_store(
847+
?VHOST, [Ref], {fun ([]) -> finished end, []}),
848+
849+
%% The store should report a clean recovery because the fix terminates
850+
%% the unresponsive GC and proceeds to write recovery files.
851+
true = rabbit_vhost_msg_store:successfully_recovered_state(
852+
?VHOST, ?PERSISTENT_MSG_STORE),
853+
854+
%% Verify all messages survived the unclean GC shutdown.
855+
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
856+
true = msg_store_contains(true, MsgIds, MSCState2),
857+
ok = rabbit_msg_store:client_terminate(MSCState2),
858+
859+
%% Clean up.
860+
restart_msg_store_empty(),
861+
passed.
862+
721863
gen_msg() ->
722864
gen_msg(1024 * 1024).
723865

0 commit comments

Comments
 (0)