Skip to content

Commit 7f4795b

Browse files
committed
Backport rabbitmq#15081
patch -p1 < ~/workplace/AmazonMqRabbitMqOpenSource/src/RabbitMq/patches/parallel-qq-shrink.patch
1 parent 52b3843 commit 7f4795b

1 file changed

Lines changed: 66 additions & 17 deletions

File tree

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888

8989
-include_lib("stdlib/include/qlc.hrl").
9090
-include_lib("rabbit_common/include/rabbit.hrl").
91+
-include_lib("kernel/include/logger.hrl").
9192
-include("amqqueue.hrl").
9293

9394
-type msg_id() :: non_neg_integer().
@@ -1325,23 +1326,71 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
13251326
[{rabbit_amqqueue:name(),
13261327
{ok, pos_integer()} | {error, pos_integer(), term()}}].
13271328
shrink_all(Node) ->
1328-
rabbit_log:info("Asked to remove all quorum queue replicas from node ~ts", [Node]),
1329-
[begin
1330-
QName = amqqueue:get_name(Q),
1331-
rabbit_log:info("~ts: removing member (replica) on node ~w",
1332-
[rabbit_misc:rs(QName), Node]),
1333-
Size = length(get_nodes(Q)),
1334-
case delete_member(Q, Node) of
1335-
ok ->
1336-
{QName, {ok, Size-1}};
1337-
{error, Err} ->
1338-
rabbit_log:warning("~ts: failed to remove member (replica) on node ~w, error: ~w",
1339-
[rabbit_misc:rs(QName), Node, Err]),
1340-
{QName, {error, Size, Err}}
1341-
end
1342-
end || Q <- rabbit_amqqueue:list(),
1343-
amqqueue:get_type(Q) == ?MODULE,
1344-
lists:member(Node, get_nodes(Q))].
1329+
?LOG_INFO("Asked to remove all quorum queue replicas from node ~ts", [Node]),
1330+
%% This operation is bound by I/O so this default is set high.
1331+
Size = application:get_env(rabbit, quorum_queue_shrink_batch_size, 64),
1332+
Chunks = ra_lib:lists_chunk(Size, [Q || Q <- rabbit_amqqueue:list(),
1333+
amqqueue:get_type(Q) == ?MODULE,
1334+
lists:member(Node, get_nodes(Q))]),
1335+
Parent = self(),
1336+
lists:flatten([begin
1337+
Tasks = [{spawn_monitor(fun() ->
1338+
Res = shrink(Node, Q),
1339+
Parent ! {self(), Res}
1340+
end), amqqueue:get_name(Q)}
1341+
|| Q <- Chunk],
1342+
[receive
1343+
{Pid, Res} ->
1344+
erlang:demonitor(MRef, [flush]),
1345+
{QName, Res};
1346+
{'DOWN', MRef, process, Pid, Reason} ->
1347+
?LOG_WARNING("~ts: failed to remove member "
1348+
"(replica) on node ~w, error: ~w",
1349+
[rabbit_misc:rs(QName), Node,
1350+
Reason]),
1351+
[]
1352+
after
1353+
15_000 ->
1354+
?LOG_WARNING("~ts: failed to remove member "
1355+
"(replica) on node ~w within 15 "
1356+
"seconds",
1357+
[rabbit_misc:rs(QName), Node]),
1358+
[]
1359+
end || {{Pid, MRef}, QName} <- Tasks]
1360+
end || Chunk <- Chunks]).
1361+
1362+
-spec shrink(node(), amqqueue:amqqueue()) ->
1363+
{ok, Size} | {error, Size, term()} when Size :: pos_integer().
1364+
shrink(Node, Q) ->
1365+
QName = amqqueue:get_name(Q),
1366+
?LOG_INFO("~ts: removing member (replica) on node ~w",
1367+
[rabbit_misc:rs(QName), Node]),
1368+
Size = length(get_nodes(Q)),
1369+
case delete_member(Q, Node) of
1370+
ok ->
1371+
{ok, Size-1};
1372+
{error, cluster_change_not_permitted} ->
1373+
%% this could be timing related and due to a new leader just being
1374+
%% elected but it's noop command not been committed yet.
1375+
%% lets sleep and retry once
1376+
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
1377+
"as cluster change is not permitted. "
1378+
"retrying once in 500ms",
1379+
[rabbit_misc:rs(QName), Node]),
1380+
timer:sleep(500),
1381+
case delete_member(Q, Node) of
1382+
ok ->
1383+
{ok, Size-1};
1384+
{error, Err} ->
1385+
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1386+
[rabbit_misc:rs(QName), Node, Err]),
1387+
{error, Size, Err}
1388+
end;
1389+
{error, Err} ->
1390+
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1391+
[rabbit_misc:rs(QName), Node, Err]),
1392+
{error, Size, Err}
1393+
end.
13451394

13461395

13471396
grow(Node, VhostSpec, QueueSpec, Strategy) ->

0 commit comments

Comments
 (0)