Skip to content

Commit dfcf96f

Browse files
michaelklishinmergify[bot]
authored andcommitted
Fix more test flakes (QQs, MQTT, STOMP, Stream Protocol)
Now every updated test passes 10 times out of 10 locally. (cherry picked from commit f364a7f) # Conflicts: # deps/rabbitmq_stomp/test/connections_SUITE.erl # deps/rabbitmq_stomp/test/system_SUITE.erl
1 parent 35164a3 commit dfcf96f

9 files changed

Lines changed: 127 additions & 45 deletions

File tree

deps/rabbit/test/quorum_queue_member_reconciliation_SUITE.erl

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
-include_lib("amqp_client/include/amqp_client.hrl").
1313
-compile([nowarn_export_all, export_all]).
1414

15+
-import(rabbit_ct_helpers, [consistently/3]).
16+
1517
%% The reconciler has two modes of triggering itself
1618
%% - timer based
1719
%% - event based
@@ -136,11 +138,11 @@ auto_grow(Config) ->
136138

137139
add_server_to_cluster(Server0, Server1),
138140
%% With 2 nodes in the cluster, target group size is not reached, so no
139-
%% new members should be available. We sleep a while so the periodic check
140-
%% runs
141-
timer:sleep(4000),
142-
{ok, Members, _} = ra:members({queue_utils:ra_name(QQ), Server1}),
143-
?assertEqual(1, length(Members)),
141+
%% new members should be available. Verify this holds over multiple
142+
%% reconciliation cycles.
143+
consistently(
144+
?_assertEqual(1, length(element(2, ra:members({queue_utils:ra_name(QQ), Server1})))),
145+
1000, 4),
144146

145147
add_server_to_cluster(Server2, Server1),
146148
%% With 3 nodes in the cluster, target size is met so eventually it should
@@ -175,10 +177,11 @@ auto_grow_drained_node(Config) ->
175177
fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, Server0) end,
176178
10000),
177179
add_server_to_cluster(Server2, Server1),
178-
timer:sleep(5000),
179-
%% We have 3 nodes, but one is drained, so it will not be concidered.
180-
{ok, Members1, _} = ra:members({queue_utils:ra_name(QQ), Server1}),
181-
?assertEqual(1, length(Members1)),
180+
%% We have 3 nodes, but one is drained, so it will not be considered.
181+
%% Verify this holds over multiple reconciliation cycles.
182+
consistently(
183+
?_assertEqual(1, length(element(2, ra:members({queue_utils:ra_name(QQ), Server1})))),
184+
1000, 5),
182185

183186
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server0),
184187
rabbit_ct_helpers:await_condition(

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -534,9 +534,6 @@ handles_reject_notification(Config) ->
534534
% reverse order - should try the first node in the list first
535535
F0 = rabbit_fifo_client:init([ServerId2, ServerId1]),
536536
{ok, F1, []} = rabbit_fifo_client:enqueue(ClusterName, one, F0),
537-
538-
timer:sleep(500),
539-
540537
% the applied notification
541538
_F2 = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
542539
rabbit_quorum_queue:stop_server(ServerId1),
@@ -646,10 +643,10 @@ cancel_checkout_with_pending(Config, Reason) ->
646643
end, F3, Msgs),
647644

648645
{ok, _F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, Reason, F4),
649-
timer:sleep(100),
650-
{ok, Overview, _} = ra:member_overview(ServerId),
651-
?assertMatch(#{machine := #{num_messages := 0,
652-
num_consumers := 0}}, Overview),
646+
rabbit_ct_helpers:eventually(
647+
?_assertMatch(#{machine := #{num_messages := 0, num_consumers := 0}},
648+
element(2, ra:member_overview(ServerId))),
649+
200, 20),
653650
flush(),
654651
ok.
655652

deps/rabbitmq_mqtt/test/auth_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,8 @@ publish_permission_will_message(Config) ->
10311031
<<"client-with-will">>,
10321032
Opts),
10331033
{ok, _} = emqtt:connect(C),
1034-
timer:sleep(100),
1034+
rabbit_ct_helpers:await_condition(
1035+
fun() -> util:all_connection_pids(Config) =/= [] end, 5_000),
10351036
[ServerPublisherPid] = util:all_connection_pids(Config),
10361037

10371038
Sub = open_mqtt_connection(Config),

deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@ publish_unauthorized_no_disconnect(Config) ->
8888

8989
ok = emqtt:publish(C, <<"topic3">>, <<"payload">>, [{qos, 0}]),
9090

91-
timer:sleep(100),
9291
%% Client should be still connected.
93-
?assert(is_process_alive(C)),
92+
rabbit_ct_helpers:consistently(?_assert(is_process_alive(C)), 50, 3),
9493

9594
ok = emqtt:disconnect(C).
9695

@@ -109,9 +108,8 @@ subscribe_unauthorized_no_disconnect(Config) ->
109108
?assertEqual(?RC_FAILURE, ReasonCode)
110109
end,
111110

112-
timer:sleep(100),
113111
%% Client should be still connected.
114-
?assert(is_process_alive(C)),
112+
rabbit_ct_helpers:consistently(?_assert(is_process_alive(C)), 50, 3),
115113

116114
ok = emqtt:disconnect(C).
117115

@@ -134,9 +132,8 @@ unsubscribe_unauthorized_no_disconnect(Config) ->
134132

135133
{ok, _, ReasonCodes0} = emqtt:unsubscribe(C, <<"topic1">>),
136134

137-
timer:sleep(100),
138135
%% Client should be still connected.
139-
?assert(is_process_alive(C)),
136+
rabbit_ct_helpers:consistently(?_assert(is_process_alive(C)), 50, 3),
140137

141138
ok = rabbit_ct_broker_helpers:rpc(
142139
Config, rabbit_auth_backend_internal, set_topic_permissions,

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -970,10 +970,7 @@ session_expiry(Config) ->
970970
ok = emqtt:disconnect(C),
971971

972972
?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
973-
timer:sleep(timer:seconds(Seconds) + 100),
974-
%% On a slow machine, this test might fail. Let's consider
975-
%% the expiry on a longer time window
976-
?awaitMatch(0, rpc(Config, rabbit_amqqueue, count, []), 15_000, 1000),
973+
?awaitMatch(0, rpc(Config, rabbit_amqqueue, count, []), 15_000, 1000),
977974

978975
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
979976

@@ -1235,11 +1232,16 @@ rabbit_mqtt_qos0_queue_kill_node(Config) ->
12351232
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
12361233
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
12371234

1235+
[Node0 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
12381236
process_flag(trap_exit, true),
12391237
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
12401238
ok = await_exit(Sub0),
1241-
%% Wait to run rabbit_amqqueue:on_node_down/1 on both live nodes.
1242-
timer:sleep(500),
1239+
%% Wait for the live nodes to detect that node 0 has gone down,
1240+
%% otherwise the subsequent reconnect on node 1 may briefly stall
1241+
%% on cluster lookups that try to reach node 0.
1242+
rabbit_ct_helpers:await_condition(
1243+
fun() -> not lists:member(Node0, rpc(Config, 1, rabbit_nodes, list_running, [])) end,
1244+
10_000),
12431245
%% Re-connect to a live node with same MQTT client ID.
12441246
Sub1 = connect(SubscriberId, Config, 1, []),
12451247
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic2, qos0),
@@ -1390,12 +1392,15 @@ maintenance(Config) ->
13901392
C1b = connect(<<"client-1b">>, Config, 1, []),
13911393
ClientsNode1 = [C1a, C1b],
13921394

1393-
timer:sleep(500),
1395+
rabbit_ct_helpers:await_condition(
1396+
fun() -> length(all_connection_pids(Config)) =:= 3 end,
1397+
10_000),
13941398

13951399
ok = drain_node(Config, 2),
13961400
ok = revive_node(Config, 2),
1397-
timer:sleep(500),
1398-
[?assert(erlang:is_process_alive(C)) || C <- [C0, C1a, C1b]],
1401+
rabbit_ct_helpers:await_condition(
1402+
fun() -> lists:all(fun erlang:is_process_alive/1, [C0, C1a, C1b]) end,
1403+
10_000),
13991404

14001405
process_flag(trap_exit, true),
14011406
ok = drain_node(Config, 1),
@@ -1544,8 +1549,10 @@ block(Config) ->
15441549
{ok, _} = emqtt:publish(C, Topic, <<"Not blocked yet">>, [{qos, 1}]),
15451550

15461551
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
1547-
%% Let it block
1548-
timer:sleep(100),
1552+
rabbit_ct_helpers:await_condition(fun() ->
1553+
Alarms = rpc(Config, rabbit_alarm, get_alarms, []),
1554+
lists:any(fun({{resource_limit, memory, _}, _}) -> true; (_) -> false end, Alarms)
1555+
end, 10_000),
15491556

15501557
%% Blocked, but still will publish when unblocked
15511558
puback_timeout = publish_qos1_timeout(C, Topic, <<"Now blocked">>, 1000),
@@ -1575,8 +1582,10 @@ block_only_publisher(Config) ->
15751582
ok = expect_publishes(PubSub, Topic, [<<"from Pub">>, <<"from PubSub">>]),
15761583

15771584
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
1578-
%% Let it block
1579-
timer:sleep(100),
1585+
rabbit_ct_helpers:await_condition(fun() ->
1586+
Alarms = rpc(Config, rabbit_alarm, get_alarms, []),
1587+
lists:any(fun({{resource_limit, memory, _}, _}) -> true; (_) -> false end, Alarms)
1588+
end, 10_000),
15801589

15811590
%% We expect that the publishing connections are blocked.
15821591
[?assertEqual({error, ack_timeout}, emqtt:ping(Pid)) || Pid <- [Pub, PubSub]],
@@ -1594,8 +1603,10 @@ block_only_publisher(Config) ->
15941603
?assertEqual(pong, emqtt:ping(Sub)),
15951604

15961605
rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.6]),
1597-
%% Let it unblock
1598-
timer:sleep(100),
1606+
rabbit_ct_helpers:await_condition(fun() ->
1607+
Alarms = rpc(Config, rabbit_alarm, get_alarms, []),
1608+
not lists:any(fun({{resource_limit, memory, _}, _}) -> true; (_) -> false end, Alarms)
1609+
end, 10_000),
15991610

16001611
%% All connections are unblocked.
16011612
[?assertEqual(pong, emqtt:ping(Pid)) || Pid <- [Con, Sub, Pub, PubSub]],

deps/rabbitmq_mqtt/test/reader_SUITE.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ block_connack_timeout(Config) ->
102102

103103
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
104104
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
105-
%% Let connection block.
106-
timer:sleep(100),
105+
rabbit_ct_helpers:await_condition(fun() ->
106+
Alarms = rpc(Config, rabbit_alarm, get_alarms, []),
107+
lists:any(fun({{resource_limit, memory, _}, _}) -> true; (_) -> false end, Alarms)
108+
end, 10_000),
107109

108110
%% We can still connect via TCP, but CONNECT packet will not be processed on the server.
109111
{ok, Client} = emqtt:start_link([{host, "localhost"},
@@ -162,11 +164,11 @@ login_timeout(Config) ->
162164

163165
stats(Config) ->
164166
C = connect(?FUNCTION_NAME, Config),
165-
%% Wait for stats being emitted (every 100ms)
166-
timer:sleep(300),
167-
%% Retrieve the connection Pid
168167
[Pid] = all_connection_pids(Config),
169168
[{pid, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Pid, [pid]]),
169+
rabbit_ct_helpers:await_condition(fun() ->
170+
rpc(Config, ets, lookup, [connection_metrics, Pid]) =/= []
171+
end, 10_000),
170172
%% Verify the content of the metrics, garbage_collection must be present
171173
[{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]),
172174
true = proplists:is_defined(garbage_collection, Props),

deps/rabbitmq_stomp/test/connections_SUITE.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ direct_client_connections_are_not_leaked(Config) ->
9191
Client, "LOL", [{"", ""}])
9292
end,
9393
lists:seq(1, 100)),
94+
<<<<<<< HEAD
9495
?awaitMatch(N, count_connections(Config), 30_000),
96+
=======
97+
rabbit_ct_helpers:await_condition(fun() -> count_connections(Config) =:= N end, 30_000),
98+
>>>>>>> f364a7fce (Fix more test flakes (QQs, MQTT, STOMP, Stream Protocol))
9599
ok.
96100

97101
messages_not_dropped_on_disconnect(Config) ->

deps/rabbitmq_stomp/test/system_SUITE.erl

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,70 @@ end_per_testcase0(publish_unauthorized_error, Config) ->
119119
end_per_testcase0(_, Config) ->
120120
Config.
121121

122+
<<<<<<< HEAD
123+
=======
124+
transaction_limit(Config) ->
125+
Client = ?config(stomp_client, Config),
126+
%% Open 16 transactions (the limit)
127+
lists:foreach(fun(I) ->
128+
TxId = integer_to_binary(I),
129+
rabbit_stomp_client:send(Client, 'BEGIN',
130+
[{<<"transaction">>, TxId}])
131+
end, lists:seq(1, 16)),
132+
133+
%% The 17th should fail
134+
rabbit_stomp_client:send(Client, 'BEGIN',
135+
[{<<"transaction">>, <<"17">>}]),
136+
{ok, _Client1, Hdrs, _} = stomp_receive(Client, 'ERROR'),
137+
<<"Transaction limit exceeded">> = maps:get(<<"message">>, Hdrs),
138+
ok.
139+
140+
global_counters(Config) ->
141+
Version = ?config(version, Config),
142+
ProtoVer = stomp_proto_ver(Version),
143+
Dest = iolist_to_binary(["/topic/counter-test-", Version]),
144+
145+
C0 = get_global_counters(Config, ProtoVer),
146+
Pubs0 = maps:get(publishers, C0, 0),
147+
Cons0 = maps:get(consumers, C0, 0),
148+
Recv0 = maps:get(messages_received_total, C0, 0),
149+
Routed0 = maps:get(messages_routed_total, C0, 0),
150+
151+
Client = ?config(stomp_client, Config),
152+
rabbit_stomp_client:send(
153+
Client, 'SUBSCRIBE',
154+
[{<<"destination">>, Dest}, {<<"id">>, <<"counter-sub">>}]),
155+
156+
rabbit_stomp_client:send(
157+
Client, 'SEND', [{<<"destination">>, Dest}], ["hello"]),
158+
159+
{ok, Client1, _Hdrs, _Body} = stomp_receive(Client, 'MESSAGE'),
160+
161+
C1 = get_global_counters(Config, ProtoVer),
162+
?assertEqual(Pubs0 + 1, maps:get(publishers, C1)),
163+
?assertEqual(Cons0 + 1, maps:get(consumers, C1)),
164+
?assertEqual(Recv0 + 1, maps:get(messages_received_total, C1)),
165+
?assertEqual(Routed0 + 1, maps:get(messages_routed_total, C1)),
166+
167+
rabbit_stomp_client:send(
168+
Client1, 'UNSUBSCRIBE', [{<<"id">>, <<"counter-sub">>}]),
169+
170+
rabbit_ct_helpers:await_condition(
171+
fun() -> maps:get(consumers, get_global_counters(Config, ProtoVer)) =:= Cons0 end,
172+
5_000),
173+
174+
ok.
175+
176+
get_global_counters(Config, ProtoVer) ->
177+
maps:get(#{protocol => ProtoVer},
178+
rabbit_ct_broker_helpers:rpc(
179+
Config, 0, rabbit_global_counters, overview, [])).
180+
181+
stomp_proto_ver("1.0") -> 'STOMP 1.0';
182+
stomp_proto_ver("1.1") -> 'STOMP 1.1';
183+
stomp_proto_ver("1.2") -> 'STOMP 1.2'.
184+
185+
>>>>>>> f364a7fce (Fix more test flakes (QQs, MQTT, STOMP, Stream Protocol))
122186
publish_no_dest_error(Config) ->
123187
Client = ?config(stomp_client, Config),
124188
rabbit_stomp_client:send(

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,11 @@ init_per_testcase(Testcase, Config) ->
6060
rabbit_ct_helpers:testcase_started(Config, Testcase).
6161

6262
end_per_testcase(Testcase, Config) ->
63-
%% Wait for exclusive or auto-delete queues being deleted.
64-
timer:sleep(800),
63+
rabbit_ct_helpers:await_condition(
64+
fun() ->
65+
Qs = rabbit_ct_broker_helpers:rpc(Config, rabbit_amqqueue, list, []),
66+
not lists:any(fun(Q) -> amqqueue:get_exclusive_owner(Q) =/= none end, Qs)
67+
end, 10_000),
6568
rabbit_ct_broker_helpers:rpc(Config, ?MODULE, delete_queues, []),
6669
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6770

0 commit comments

Comments
 (0)