@@ -1507,39 +1507,70 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
15071507 {ok , pos_integer ()} | {error , pos_integer (), term ()}}].
15081508shrink_all (Node ) ->
15091509 ? LOG_INFO (" Asked to remove all quorum queue replicas from node ~ts " , [Node ]),
1510- [begin
1511- QName = amqqueue :get_name (Q ),
1512- ? LOG_INFO (" ~ts : removing member (replica) on node ~w " ,
1513- [rabbit_misc :rs (QName ), Node ]),
1514- Size = length (get_nodes (Q )),
1515- case delete_member (Q , Node ) of
1516- ok ->
1517- {QName , {ok , Size - 1 }};
1518- {error , cluster_change_not_permitted } ->
1519- % % this could be timing related and due to a new leader just being
1520- % % elected but it's noop command not been committed yet.
1521- % % lets sleep and retry once
1522- ? LOG_INFO (" ~ts : failed to remove member (replica) on node ~w "
1523- " as cluster change is not permitted. "
1524- " retrying once in 500ms" ,
1525- [rabbit_misc :rs (QName ), Node ]),
1526- timer :sleep (500 ),
1527- case delete_member (Q , Node ) of
1528- ok ->
1529- {QName , {ok , Size - 1 }};
1530- {error , Err } ->
1531- ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1532- [rabbit_misc :rs (QName ), Node , Err ]),
1533- {QName , {error , Size , Err }}
1534- end ;
1535- {error , Err } ->
1536- ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1537- [rabbit_misc :rs (QName ), Node , Err ]),
1538- {QName , {error , Size , Err }}
1539- end
1540- end || Q <- rabbit_amqqueue :list (),
1541- amqqueue :get_type (Q ) == ? MODULE ,
1542- lists :member (Node , get_nodes (Q ))].
1510+ % % This operation is bound by I/O so this default is set high.
1511+ Size = application :get_env (rabbit , quorum_queue_shrink_batch_size , 64 ),
1512+ Chunks = ra_lib :lists_chunk (Size , [Q || Q <- rabbit_amqqueue :list (),
1513+ amqqueue :get_type (Q ) == ? MODULE ,
1514+ lists :member (Node , get_nodes (Q ))]),
1515+ Parent = self (),
1516+ lists :flatten ([begin
1517+ Tasks = [{spawn_monitor (fun () ->
1518+ Res = shrink (Node , Q ),
1519+ Parent ! {self (), Res }
1520+ end ), amqqueue :get_name (Q )}
1521+ || Q <- Chunk ],
1522+ [receive
1523+ {Pid , Res } ->
1524+ erlang :demonitor (MRef , [flush ]),
1525+ {QName , Res };
1526+ {'DOWN' , MRef , process , Pid , Reason } ->
1527+ ? LOG_WARNING (" ~ts : failed to remove member "
1528+ " (replica) on node ~w , error: ~w " ,
1529+ [rabbit_misc :rs (QName ), Node ,
1530+ Reason ]),
1531+ []
1532+ after
1533+ 15_000 ->
1534+ ? LOG_WARNING (" ~ts : failed to remove member "
1535+ " (replica) on node ~w within 15 "
1536+ " seconds" ,
1537+ [rabbit_misc :rs (QName ), Node ]),
1538+ []
1539+ end || {{Pid , MRef }, QName } <- Tasks ]
1540+ end || Chunk <- Chunks ]).
1541+
1542+ - spec shrink (node (), amqqueue :amqqueue ()) ->
1543+ {ok , Size } | {error , Size , term ()} when Size :: pos_integer ().
1544+ shrink (Node , Q ) ->
1545+ QName = amqqueue :get_name (Q ),
1546+ ? LOG_INFO (" ~ts : removing member (replica) on node ~w " ,
1547+ [rabbit_misc :rs (QName ), Node ]),
1548+ Size = length (get_nodes (Q )),
1549+ case delete_member (Q , Node ) of
1550+ ok ->
1551+ {ok , Size - 1 };
1552+ {error , cluster_change_not_permitted } ->
1553+ % % this could be timing related and due to a new leader just being
1554+ % % elected but it's noop command not been committed yet.
1555+ % % lets sleep and retry once
1556+ ? LOG_INFO (" ~ts : failed to remove member (replica) on node ~w "
1557+ " as cluster change is not permitted. "
1558+ " retrying once in 500ms" ,
1559+ [rabbit_misc :rs (QName ), Node ]),
1560+ timer :sleep (500 ),
1561+ case delete_member (Q , Node ) of
1562+ ok ->
1563+ {ok , Size - 1 };
1564+ {error , Err } ->
1565+ ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1566+ [rabbit_misc :rs (QName ), Node , Err ]),
1567+ {error , Size , Err }
1568+ end ;
1569+ {error , Err } ->
1570+ ? LOG_WARNING (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1571+ [rabbit_misc :rs (QName ), Node , Err ]),
1572+ {error , Size , Err }
1573+ end .
15431574
15441575
15451576grow (Node , VhostSpec , QueueSpec , Strategy ) ->
0 commit comments