Skip to content

Commit 5c809a1

Browse files
committed
Test flakes may05 2026 (#16305) (#16306)
* quorum_queue_SUITE: await condition Sometimes we can get `{error,4, {error, non_voters_found}}` if we don't wait * mqtt_shared_SUITE: await basic.get could have happened before the message was ready (cherry picked from commit e56c164) Co-authored-by: Michal Kuratczyk <michal.kuratczyk@broadcom.com> (cherry picked from commit 71edc16) # Conflicts: # deps/rabbit/test/quorum_queue_SUITE.erl
1 parent fabdafc commit 5c809a1

2 files changed

Lines changed: 182 additions & 4 deletions

File tree

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,6 +1875,184 @@ dont_leak_file_handles(Config) ->
18751875
rabbit_ct_client_helpers:close_channel(C),
18761876
ok.
18771877

1878+
<<<<<<< HEAD
1879+
=======
1880+
grow_queue(Config) ->
1881+
[Server0, Server1, _Server2, _Server3, _Server4] =
1882+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1883+
1884+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1885+
QQ = ?config(queue_name, Config),
1886+
AQ = ?config(alt_queue_name, Config),
1887+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1888+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
1889+
{<<"x-quorum-initial-group-size">>, long, 5}])),
1890+
?assertEqual({'queue.declare_ok', AQ, 0, 0},
1891+
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
1892+
{<<"x-quorum-initial-group-size">>, long, 5}])),
1893+
1894+
QQs = [QQ, AQ],
1895+
MsgCount = 3,
1896+
1897+
[begin
1898+
RaName = ra_name(Q),
1899+
rabbit_ct_client_helpers:publish(Ch, Q, MsgCount),
1900+
wait_for_messages_ready([Server0], RaName, MsgCount),
1901+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1902+
Nodes0 = rabbit_queue_type:get_nodes(Q0),
1903+
?assertEqual(5, length(Nodes0))
1904+
end || Q <- QQs],
1905+
1906+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1907+
force_all_queues_shrink_member_to_current_member, []),
1908+
1909+
TargetClusterSize_1 = 1,
1910+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1911+
1912+
%% grow queues to node 'Server1'
1913+
TargetClusterSize_2 = 2,
1914+
Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
1915+
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}},
1916+
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...]
1917+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)),
1918+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1919+
1920+
%% grow queues to quorum cluster size '2' has no effect
1921+
Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
1922+
?assertEqual([], Result2),
1923+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1924+
1925+
%% grow queues to quorum cluster size '3'
1926+
TargetClusterSize_3 = 3,
1927+
Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]),
1928+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)),
1929+
assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount),
1930+
1931+
%% grow queues to quorum cluster size '5'
1932+
TargetClusterSize_5 = 5,
1933+
Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]),
1934+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)),
1935+
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),
1936+
1937+
%% shrink all queues again down to 1 member
1938+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1939+
force_all_queues_shrink_member_to_current_member, []),
1940+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1941+
1942+
%% grow queues to quorum cluster size > '5' (limit = 5).
1943+
TargetClusterSize_10 = 10,
1944+
rabbit_ct_helpers:await_condition(
1945+
fun() ->
1946+
rpc:call(Server0, rabbit_quorum_queue, grow,
1947+
[TargetClusterSize_10, <<"/">>, <<".*">>, all]),
1948+
lists:all(
1949+
fun(Q) ->
1950+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1951+
length(rabbit_queue_type:get_nodes(Q0)) =:= TargetClusterSize_5
1952+
end, QQs)
1953+
end, 30_000),
1954+
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),
1955+
1956+
%% shrink all queues again down to 1 member
1957+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1958+
force_all_queues_shrink_member_to_current_member, []),
1959+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1960+
1961+
%% attempt to grow queues to quorum cluster size < '0'.
1962+
BadTargetClusterSize = -5,
1963+
?assertEqual({error, bad_quorum_cluster_size},
1964+
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])),
1965+
1966+
%% shrink all queues again down to 1 member
1967+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1968+
force_all_queues_shrink_member_to_current_member, []),
1969+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1970+
1971+
%% grow queues to target quorum cluster size '5': fail, non_voters found
1972+
rabbit_ct_helpers:await_condition(
1973+
fun () ->
1974+
Result7 = rpc:call(Server0, rabbit_quorum_queue, grow,
1975+
[TargetClusterSize_5, <<"/">>, <<".*">>, all]),
1976+
lists:all(
1977+
fun({_, Err}) ->
1978+
Err =:= {error, TargetClusterSize_5, {error, non_voters_found}}
1979+
end, Result7)
1980+
end),
1981+
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount).
1982+
1983+
assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) ->
1984+
[begin
1985+
RaName = ra_name(Q),
1986+
wait_for_messages_ready([Node], RaName, MsgCount),
1987+
{ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1988+
Nodes0 = rabbit_queue_type:get_nodes(Q0),
1989+
?assertEqual(TargetClusterSize, length(Nodes0))
1990+
end || Q <- Qs].
1991+
1992+
consumer_message_is_delevered_after_snapshot(Config) ->
1993+
%% a consumer on a node that received a snapshot should have it's messages
1994+
%% delivered
1995+
[Server0, _Server1, Server2] = Nodes =
1996+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1997+
1998+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
1999+
[rabbit, quorum_min_checkpoint_interval, 1]),
2000+
2001+
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
2002+
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
2003+
QQ = ?config(queue_name, Config),
2004+
RaName = ra_name(QQ),
2005+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
2006+
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2007+
2008+
%% stop server on a follower node
2009+
ok = rpc:call(Server2, ra, stop_server, [quorum_queues, {RaName, Server2}]),
2010+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
2011+
%% create a consumer
2012+
qos(Ch2, 2, false),
2013+
subscribe(Ch2, QQ, false),
2014+
2015+
%% publish some messages and make sure a snapshot has been taken
2016+
Msg = crypto:strong_rand_bytes(13_000),
2017+
2018+
[publish(Ch0, QQ, Msg) || _ <- lists:seq(1, 5000)],
2019+
amqp_channel:wait_for_confirms(Ch0, 5),
2020+
%% need to sleep here a bit as QQs wont take
2021+
%% snapshots more often than once every second
2022+
timer:sleep(1100),
2023+
2024+
%% then purge
2025+
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),
2026+
2027+
MacVer = lists:min([V || {ok, V} <-
2028+
erpc:multicall(Nodes, rabbit_fifo, version, [])]),
2029+
ct:pal("machine version is ~b", [MacVer]),
2030+
2031+
%% only await snapshot if all members have at least machine version 8
2032+
if MacVer >= 8 ->
2033+
rabbit_ct_helpers:await_condition(
2034+
fun () ->
2035+
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview,
2036+
[{RaName, Server0}]),
2037+
undefined =/= maps:get(snapshot_index, Log)
2038+
end);
2039+
true ->
2040+
ok
2041+
end,
2042+
%% restart stopped member
2043+
ok = rpc:call(Server2, ra, restart_server, [quorum_queues, {RaName, Server2}]),
2044+
2045+
%% messages should be delivered
2046+
receive
2047+
{#'basic.deliver'{delivery_tag = _DeliveryTag}, _} ->
2048+
ok
2049+
after 30000 ->
2050+
flush(1),
2051+
ct:fail("expected messages were not delivered")
2052+
end,
2053+
ok.
2054+
2055+
>>>>>>> 71edc16388 (Test flakes may05 2026 (#16305) (#16306))
18782056
gh_12635(Config) ->
18792057
check_quorum_queues_v4_compat(Config),
18802058

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,10 +1691,10 @@ trace(Config) ->
16911691
<<"routed_queues">> := [<<"mqtt-subscription-trace_subscriberqos0">>]},
16921692
rabbit_misc:amqp_table(PublishHeaders)),
16931693

1694-
{#'basic.get_ok'{routing_key = <<"deliver.mqtt-subscription-trace_subscriberqos0">>},
1695-
#amqp_msg{props = #'P_basic'{headers = DeliverHeaders},
1696-
payload = Payload}} =
1697-
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
1694+
{_, #amqp_msg{props = #'P_basic'{headers = DeliverHeaders}, payload = Payload}} =
1695+
?awaitMatch({#'basic.get_ok'{routing_key = <<"deliver.mqtt-subscription-trace_subscriberqos0">>}, _},
1696+
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
1697+
5_000),
16981698
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
16991699
<<"routing_keys">> := [Topic],
17001700
<<"connection">> := <<"127.0.0.1:", _/binary>>,

0 commit comments

Comments
 (0)