@@ -1658,39 +1658,70 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
16581658 {ok , pos_integer ()} | {error , pos_integer (), term ()}}].
16591659shrink_all (Node ) ->
16601660 ? LOG_INFO (" Asked to remove all quorum queue replicas from node ~ts " , [Node ]),
1661- [begin
1662- QName = amqqueue :get_name (Q ),
1663- ? LOG_INFO (" ~ts : removing member (replica) on node ~w " ,
1664- [rabbit_misc :rs (QName ), Node ]),
1665- Size = length (get_nodes (Q )),
1666- case delete_member (Q , Node ) of
1667- ok ->
1668- {QName , {ok , Size - 1 }};
1669- {error , cluster_change_not_permitted } ->
1670- % % this could be timing related and due to a new leader just being
1671- % % elected but it's noop command not been committed yet.
1672- % % lets sleep and retry once
1673- ? LOG_INFO (" ~ts : failed to remove member (replica) on node ~w "
1674- " as cluster change is not permitted. "
1675- " retrying once in 500ms" ,
1676- [rabbit_misc :rs (QName ), Node ]),
1677- timer :sleep (500 ),
1678- case delete_member (Q , Node ) of
1679- ok ->
1680- {QName , {ok , Size - 1 }};
1681- {error , Err } ->
1682- ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1683- [rabbit_misc :rs (QName ), Node , Err ]),
1684- {QName , {error , Size , Err }}
1685- end ;
1686- {error , Err } ->
1687- ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1688- [rabbit_misc :rs (QName ), Node , Err ]),
1689- {QName , {error , Size , Err }}
1690- end
1691- end || Q <- rabbit_amqqueue :list (),
1692- amqqueue :get_type (Q ) == ? MODULE ,
1693- lists :member (Node , get_nodes (Q ))].
1661+ % % This operation is bound by I/O so this default is set high.
1662+ Size = application :get_env (rabbit , quorum_queue_shrink_batch_size , 64 ),
1663+ Chunks = ra_lib :lists_chunk (Size , [Q || Q <- rabbit_amqqueue :list (),
1664+ amqqueue :get_type (Q ) == ? MODULE ,
1665+ lists :member (Node , get_nodes (Q ))]),
1666+ Parent = self (),
1667+ lists :flatten ([begin
1668+ Tasks = [{spawn_monitor (fun () ->
1669+ Res = shrink (Node , Q ),
1670+ Parent ! {self (), Res }
1671+ end ), amqqueue :get_name (Q )}
1672+ || Q <- Chunk ],
1673+ [receive
1674+ {Pid , Res } ->
1675+ erlang :demonitor (MRef , [flush ]),
1676+ {QName , Res };
1677+ {'DOWN' , MRef , process , Pid , Reason } ->
1678+ ? LOG_WARNING (" ~ts : failed to remove member "
1679+ " (replica) on node ~w , error: ~w " ,
1680+ [rabbit_misc :rs (QName ), Node ,
1681+ Reason ]),
1682+ []
1683+ after
1684+ 15_000 ->
1685+ ? LOG_WARNING (" ~ts : failed to remove member "
1686+ " (replica) on node ~w within 15 "
1687+ " seconds" ,
1688+ [rabbit_misc :rs (QName ), Node ]),
1689+ []
1690+ end || {{Pid , MRef }, QName } <- Tasks ]
1691+ end || Chunk <- Chunks ]).
1692+
1693+ - spec shrink (node (), amqqueue :amqqueue ()) ->
1694+ {ok , Size } | {error , Size , term ()} when Size :: pos_integer ().
1695+ shrink (Node , Q ) ->
1696+ QName = amqqueue :get_name (Q ),
1697+ ? LOG_INFO (" ~ts : removing member (replica) on node ~w " ,
1698+ [rabbit_misc :rs (QName ), Node ]),
1699+ Size = length (get_nodes (Q )),
1700+ case delete_member (Q , Node ) of
1701+ ok ->
1702+ {ok , Size - 1 };
1703+ {error , cluster_change_not_permitted } ->
1704+ % % this could be timing related and due to a new leader just being
1705+ % % elected but it's noop command not been committed yet.
1706+ % % lets sleep and retry once
1707+ ? LOG_INFO (" ~ts : failed to remove member (replica) on node ~w "
1708+ " as cluster change is not permitted. "
1709+ " retrying once in 500ms" ,
1710+ [rabbit_misc :rs (QName ), Node ]),
1711+ timer :sleep (500 ),
1712+ case delete_member (Q , Node ) of
1713+ ok ->
1714+ {ok , Size - 1 };
1715+ {error , Err } ->
1716+ ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1717+ [rabbit_misc :rs (QName ), Node , Err ]),
1718+ {error , Size , Err }
1719+ end ;
1720+ {error , Err } ->
1721+ ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1722+ [rabbit_misc :rs (QName ), Node , Err ]),
1723+ {error , Size , Err }
1724+ end .
16941725
16951726- spec grow (node () | integer (), binary (), binary (), all | even ) ->
16961727 [{rabbit_amqqueue :name (),
0 commit comments