Skip to content

Commit 511692a

Browse files
the-mikedavislukebakken
authored andcommitted
rabbit_quorum_queue: Shrink batches of QQs in parallel
Shrinking a member node off of a QQ can be parallelized. The operation involves * removing the node from the QQ's cluster membership (appending a command to the log and committing it) with `ra:remove_member/3` * updating the metadata store to remove the member from the QQ type state with `rabbit_amqqueue:update/2` * deleting the queue data from the node with `ra:force_delete_server/2` if the node can be reached All of these operations are I/O bound. Updating the cluster membership and metadata store involves appending commands to those logs and replicating them. Writing commands to Ra synchronously in serial is fairly slow - sending many commands in parallel is much more efficient. By parallelizing these steps we can write larger chunks of commands to WAL(s). `ra:force_delete_server/2` benefits from parallelizing if the node being shrunk off is no longer reachable, for example in some hardware failures. The underlying `rpc:call/4` will attempt to auto-connect to the node and this can take some time to time out. By parallelizing this, each `rpc:call/4` reuses the same underlying distribution entry and all calls fail together once the connection fails to establish.
1 parent cf69ee6 commit 511692a

1 file changed

Lines changed: 64 additions & 33 deletions

File tree

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,39 +1492,70 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
14921492
{ok, pos_integer()} | {error, pos_integer(), term()}}].
14931493
shrink_all(Node) ->
14941494
?LOG_INFO("Asked to remove all quorum queue replicas from node ~ts", [Node]),
1495-
[begin
1496-
QName = amqqueue:get_name(Q),
1497-
?LOG_INFO("~ts: removing member (replica) on node ~w",
1498-
[rabbit_misc:rs(QName), Node]),
1499-
Size = length(get_nodes(Q)),
1500-
case delete_member(Q, Node) of
1501-
ok ->
1502-
{QName, {ok, Size-1}};
1503-
{error, cluster_change_not_permitted} ->
1504-
%% this could be timing related and due to a new leader just being
1505-
%% elected but it's noop command not been committed yet.
1506-
%% lets sleep and retry once
1507-
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
1508-
"as cluster change is not permitted. "
1509-
"retrying once in 500ms",
1510-
[rabbit_misc:rs(QName), Node]),
1511-
timer:sleep(500),
1512-
case delete_member(Q, Node) of
1513-
ok ->
1514-
{QName, {ok, Size-1}};
1515-
{error, Err} ->
1516-
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1517-
[rabbit_misc:rs(QName), Node, Err]),
1518-
{QName, {error, Size, Err}}
1519-
end;
1520-
{error, Err} ->
1521-
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1522-
[rabbit_misc:rs(QName), Node, Err]),
1523-
{QName, {error, Size, Err}}
1524-
end
1525-
end || Q <- rabbit_amqqueue:list(),
1526-
amqqueue:get_type(Q) == ?MODULE,
1527-
lists:member(Node, get_nodes(Q))].
1495+
%% This operation is bound by I/O so this default is set high.
1496+
Size = application:get_env(rabbit, quorum_queue_shrink_batch_size, 64),
1497+
Chunks = ra_lib:lists_chunk(Size, [Q || Q <- rabbit_amqqueue:list(),
1498+
amqqueue:get_type(Q) == ?MODULE,
1499+
lists:member(Node, get_nodes(Q))]),
1500+
Parent = self(),
1501+
lists:flatten([begin
1502+
Tasks = [{spawn_monitor(fun() ->
1503+
Res = shrink(Node, Q),
1504+
Parent ! {self(), Res}
1505+
end), amqqueue:get_name(Q)}
1506+
|| Q <- Chunk],
1507+
[receive
1508+
{Pid, Res} ->
1509+
erlang:demonitor(MRef, [flush]),
1510+
{QName, Res};
1511+
{'DOWN', MRef, process, Pid, Reason} ->
1512+
?LOG_WARNING("~ts: failed to remove member "
1513+
"(replica) on node ~w, error: ~w",
1514+
[rabbit_misc:rs(QName), Node,
1515+
Reason]),
1516+
[]
1517+
after
1518+
15_000 ->
1519+
?LOG_WARNING("~ts: failed to remove member "
1520+
"(replica) on node ~w within 15 "
1521+
"seconds",
1522+
[rabbit_misc:rs(QName), Node]),
1523+
[]
1524+
end || {{Pid, MRef}, QName} <- Tasks]
1525+
end || Chunk <- Chunks]).
1526+
1527+
-spec shrink(node(), amqqueue:amqqueue()) ->
1528+
{ok, Size} | {error, Size, term()} when Size :: pos_integer().
1529+
shrink(Node, Q) ->
1530+
QName = amqqueue:get_name(Q),
1531+
?LOG_INFO("~ts: removing member (replica) on node ~w",
1532+
[rabbit_misc:rs(QName), Node]),
1533+
Size = length(get_nodes(Q)),
1534+
case delete_member(Q, Node) of
1535+
ok ->
1536+
{ok, Size-1};
1537+
{error, cluster_change_not_permitted} ->
1538+
%% this could be timing related and due to a new leader just being
1539+
%% elected but it's noop command not been committed yet.
1540+
%% lets sleep and retry once
1541+
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
1542+
"as cluster change is not permitted. "
1543+
"retrying once in 500ms",
1544+
[rabbit_misc:rs(QName), Node]),
1545+
timer:sleep(500),
1546+
case delete_member(Q, Node) of
1547+
ok ->
1548+
{ok, Size-1};
1549+
{error, Err} ->
1550+
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1551+
[rabbit_misc:rs(QName), Node, Err]),
1552+
{error, Size, Err}
1553+
end;
1554+
{error, Err} ->
1555+
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1556+
[rabbit_misc:rs(QName), Node, Err]),
1557+
{error, Size, Err}
1558+
end.
15281559

15291560
-spec grow(node() | integer(), binary(), binary(), all | even) ->
15301561
[{rabbit_amqqueue:name(),

0 commit comments

Comments
 (0)