@@ -84,7 +84,8 @@ defmodule ListConsumersCommandTest do
8484
8585 with_channel ( @ vhost , fn channel ->
8686 { :ok , _ } = AMQP.Basic . consume ( channel , queue_name , nil , consumer_tag: consumer_tag )
87- :timer . sleep ( 100 )
87+ # Consumer registration is asynchronous; wait for it to be visible.
88+ await_consumer_count ( context [ :opts ] , 1 )
8889 [ [ consumer ] ] = run_command_to_list ( @ command , [ info_keys_s , context [ :opts ] ] )
8990 assert info_keys_a == Keyword . keys ( consumer )
9091 assert consumer [ :consumer_tag ] == consumer_tag
@@ -111,7 +112,8 @@ defmodule ListConsumersCommandTest do
111112 { :ok , tag1 } = AMQP.Basic . consume ( channel , queue_name1 )
112113 { :ok , tag2 } = AMQP.Basic . consume ( channel , queue_name2 )
113114 { :ok , tag3 } = AMQP.Basic . consume ( channel , queue_name2 )
114- :timer . sleep ( 100 )
115+ # Consumer registration is asynchronous; wait for them to be visible.
116+ await_consumer_count ( context [ :opts ] , 3 )
115117
116118 try do
117119 consumers =
@@ -149,7 +151,8 @@ defmodule ListConsumersCommandTest do
149151 { :ok , tag1 } = AMQP.Basic . consume ( channel , queue_name )
150152 { :ok , tag2 } = AMQP.Basic . consume ( channel , queue_name )
151153 { :ok , tag3 } = AMQP.Basic . consume ( channel , queue_name )
152- :timer . sleep ( 100 )
154+ # Consumer registration is asynchronous; wait for them to be visible.
155+ await_consumer_count ( context [ :opts ] , 3 )
153156
154157 try do
155158 consumers =
@@ -201,7 +204,8 @@ defmodule ListConsumersCommandTest do
201204 { :ok , tag1 } = AMQP.Basic . consume ( channel , queue_name )
202205 { :ok , tag2 } = AMQP.Basic . consume ( channel , queue_name )
203206 { :ok , tag3 } = AMQP.Basic . consume ( channel , queue_name )
204- :timer . sleep ( 100 )
207+ # Consumer registration is asynchronous; wait for them to be visible.
208+ await_consumer_count ( context [ :opts ] , 3 )
205209
206210 try do
207211 consumers =
@@ -226,7 +230,8 @@ defmodule ListConsumersCommandTest do
226230 )
227231
228232 AMQP.Basic . cancel ( channel , tag1 )
229- :timer . sleep ( 100 )
233+ # Consumer cancellation is asynchronous; wait for the count to drop.
234+ await_consumer_count ( context [ :opts ] , 2 )
230235
231236 consumers =
232237 List . first (
@@ -300,4 +305,14 @@ defmodule ListConsumersCommandTest do
300305 ]
301306 ] , { 1 , :continue } }
302307 end
308+
309+ defp await_consumer_count ( opts , expected_count ) do
310+ await_condition (
311+ fn ->
312+ consumers = run_command_to_list ( @ command , [ [ "queue_name" ] , opts ] )
313+ Enum . reduce ( consumers , 0 , fn group , acc -> acc + length ( group ) end ) == expected_count
314+ end ,
315+ 10_000
316+ )
317+ end
303318end
0 commit comments