@@ -23,7 +23,8 @@ groups() ->
2323 routed_to_zero_queue_test ,
2424 routed_to_one_queue_test ,
2525 routed_to_many_queue_test ,
26- stable_routing_across_restarts_test
26+ stable_routing_across_restarts_test ,
27+ weighted_routing_test
2728 ]}
2829 ].
2930
@@ -187,6 +188,76 @@ stable_routing_across_restarts_test(Config) ->
187188 [amqp_channel :call (Chan2 , # 'queue.delete' {queue = Q }) || Q <- Queues ],
188189 ok = rabbit_ct_client_helpers :close_connection_and_channel (Conn2 , Chan2 ).
189190
191+ weighted_routing_test (Config ) ->
192+ {Conn , Chan } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
193+ XNameBin = atom_to_binary (? FUNCTION_NAME ),
194+ Queues = [<<" q1" >>, <<" q2" >>, <<" q3" >>],
195+ NumMsgs = 600 ,
196+
197+ # 'exchange.declare_ok' {} = amqp_channel :call (Chan ,
198+ # 'exchange.declare' {
199+ exchange = XNameBin ,
200+ type = <<" x-modulus-hash" >>,
201+ durable = true }),
202+
203+ [# 'queue.declare_ok' {} = amqp_channel :call (Chan , # 'queue.declare' {queue = Q ,
204+ durable = true })
205+ || Q <- Queues ],
206+
207+ % % Bind q1 once
208+ # 'queue.bind_ok' {} = amqp_channel :call (Chan , # 'queue.bind' {queue = <<" q1" >>,
209+ exchange = XNameBin }),
210+
211+ % % Bind q2 twice
212+ # 'queue.bind_ok' {} = amqp_channel :call (Chan , # 'queue.bind' {queue = <<" q2" >>,
213+ exchange = XNameBin ,
214+ routing_key = <<" a" >>}),
215+ # 'queue.bind_ok' {} = amqp_channel :call (Chan , # 'queue.bind' {queue = <<" q2" >>,
216+ exchange = XNameBin ,
217+ routing_key = <<" b" >>}),
218+
219+ % % Bind q3 three times
220+ # 'queue.bind_ok' {} = amqp_channel :call (Chan , # 'queue.bind' {queue = <<" q3" >>,
221+ exchange = XNameBin ,
222+ routing_key = <<" a" >>}),
223+ # 'queue.bind_ok' {} = amqp_channel :call (Chan , # 'queue.bind' {queue = <<" q3" >>,
224+ exchange = XNameBin ,
225+ routing_key = <<" b" >>}),
226+ # 'queue.bind_ok' {} = amqp_channel :call (Chan , # 'queue.bind' {queue = <<" q3" >>,
227+ exchange = XNameBin ,
228+ routing_key = <<" c" >>}),
229+
230+ amqp_channel :call (Chan , # 'confirm.select' {}),
231+ [amqp_channel :call (Chan ,
232+ # 'basic.publish' {exchange = XNameBin ,
233+ routing_key = integer_to_binary (I )},
234+ # amqp_msg {})
235+ || I <- lists :seq (1 , NumMsgs )],
236+ amqp_channel :wait_for_confirms_or_die (Chan ),
237+
238+ Counts = lists :foldl (
239+ fun (Q , Acc ) ->
240+ # 'queue.declare_ok' {message_count = M } = amqp_channel :call (
241+ Chan ,
242+ # 'queue.declare' {queue = Q ,
243+ durable = true }),
244+ maps :put (Q , M , Acc )
245+ end , #{}, Queues ),
246+
247+ C1 = maps :get (<<" q1" >>, Counts ),
248+ C2 = maps :get (<<" q2" >>, Counts ),
249+ C3 = maps :get (<<" q3" >>, Counts ),
250+ ct :pal (" q1: ~b , q2: ~b , q3: ~b " , [C1 , C2 , C3 ]),
251+
252+ ? assertEqual (NumMsgs , C1 + C2 + C3 ),
253+ % % Assert weighted distribution
254+ ? assert (C1 < C2 ),
255+ ? assert (C2 < C3 ),
256+
257+ amqp_channel :call (Chan , # 'exchange.delete' {exchange = XNameBin }),
258+ [amqp_channel :call (Chan , # 'queue.delete' {queue = Q }) || Q <- Queues ],
259+ ok = rabbit_ct_client_helpers :close_connection_and_channel (Conn , Chan ).
260+
190261consume_all (Chan , Queues ) ->
191262 lists :foldl (fun (Q , Map ) ->
192263 Msgs = consume_queue (Chan , Q , []),
0 commit comments