@@ -24,7 +24,10 @@ groups() ->
2424 register_interceptor ,
2525 register_interceptor_failing_with_amqp_error ,
2626 register_interceptor_crashing_with_amqp_error_exception ,
27- register_failing_interceptors
27+ register_failing_interceptors ,
28+ multiple_interceptors_ordered_by_priority ,
29+ reject_interceptors_with_same_priority_for_same_operation ,
30+ priority_overridden_by_config
2831 ]}
2932 ].
3033
@@ -213,6 +216,98 @@ register_failing_interceptors(Config) ->
213216 passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
214217 ? MODULE , register_interceptor1 , [Config , failing_dummy_interceptor ]).
215218
219+ multiple_interceptors_ordered_by_priority (Config ) ->
220+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
221+ ? MODULE , multiple_interceptors_ordered_by_priority1 , [Config ]).
222+
223+ multiple_interceptors_ordered_by_priority1 (Config ) ->
224+ Ch = rabbit_ct_client_helpers :open_channel (Config , 0 ),
225+ QName = <<" multiple-interceptors-q" >>,
226+ # 'queue.declare_ok' {} = amqp_channel :call (Ch , # 'queue.declare' {queue = QName ,
227+ durable = true }),
228+
229+ ok = application :set_env (rabbit , channel_interceptor_priorities ,
230+ [{dummy_interceptor_priority_1 , 1 },
231+ {dummy_interceptor_priority_2 , 2 },
232+ {dummy_interceptor_priority_3 , 3 }]),
233+
234+ ok = rabbit_registry :register (channel_interceptor ,
235+ <<" dummy interceptor priority 3" >>, dummy_interceptor_priority_3 ),
236+ ok = rabbit_registry :register (channel_interceptor ,
237+ <<" dummy interceptor priority 2" >>, dummy_interceptor_priority_2 ),
238+ ok = rabbit_registry :register (channel_interceptor ,
239+ <<" dummy interceptor priority 1" >>, dummy_interceptor_priority_1 ),
240+
241+ % % Interceptors run in ascending priority order regardless of registration order,
242+ % % so the payload becomes <<"foo1">>, then <<"foo12">>, then <<"foo123">>.
243+ check_send_receive (Ch , QName , <<" foo" >>, <<" foo123" >>),
244+
245+ ok = rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 1" >>),
246+ ok = rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 2" >>),
247+ ok = rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 3" >>),
248+ ok = application :unset_env (rabbit , channel_interceptor_priorities ),
249+
250+ # 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = QName }),
251+ passed .
252+
253+ reject_interceptors_with_same_priority_for_same_operation (Config ) ->
254+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
255+ ? MODULE , reject_interceptors_with_same_priority_for_same_operation1 , [Config ]).
256+
257+ reject_interceptors_with_same_priority_for_same_operation1 (_Config ) ->
258+ ok = rabbit_registry :register (channel_interceptor ,
259+ <<" dummy interceptor priority 1" >>,
260+ dummy_interceptor_priority_1 ),
261+ ok = rabbit_registry :register (channel_interceptor ,
262+ <<" dummy interceptor priority 1 conflict" >>,
263+ dummy_interceptor_priority_1_conflict ),
264+ try
265+ % % Initialising interceptors must fail: two interceptors with the same
266+ % % priority handle the same AMQP operation.
267+ rabbit_channel_interceptor :init (self ())
268+ catch
269+ exit :{amqp_error , internal_error , _ , _ } -> ok
270+ after
271+ rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 1" >>),
272+ rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 1 conflict" >>)
273+ end ,
274+ passed .
275+
276+ priority_overridden_by_config (Config ) ->
277+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
278+ ? MODULE , priority_overridden_by_config1 , [Config ]).
279+
280+ priority_overridden_by_config1 (Config ) ->
281+ Ch = rabbit_ct_client_helpers :open_channel (Config , 0 ),
282+ QName = <<" priority-override-q" >>,
283+ # 'queue.declare_ok' {} = amqp_channel :call (Ch , # 'queue.declare' {queue = QName ,
284+ durable = true }),
285+
286+ % % priority_1 (config priority=1) runs before priority_3 (config priority=3),
287+ % % so the result is <<"foo13">>.
288+ ok = application :set_env (rabbit , channel_interceptor_priorities ,
289+ [{dummy_interceptor_priority_1 , 1 },
290+ {dummy_interceptor_priority_3 , 3 }]),
291+ ok = rabbit_registry :register (channel_interceptor ,
292+ <<" dummy interceptor priority 1" >>, dummy_interceptor_priority_1 ),
293+ ok = rabbit_registry :register (channel_interceptor ,
294+ <<" dummy interceptor priority 3" >>, dummy_interceptor_priority_3 ),
295+ check_send_receive (Ch , QName , <<" foo" >>, <<" foo13" >>),
296+
297+ % % Reconfigure priority_3 to run first (priority=0). Now the result is <<"foo31">>.
298+ ok = application :set_env (rabbit , channel_interceptor_priorities ,
299+ [{dummy_interceptor_priority_1 , 1 },
300+ {dummy_interceptor_priority_3 , 0 }]),
301+ rabbit_channel :refresh_interceptors (),
302+ check_send_receive (Ch , QName , <<" foo" >>, <<" foo31" >>),
303+
304+ ok = application :unset_env (rabbit , channel_interceptor_priorities ),
305+ ok = rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 1" >>),
306+ ok = rabbit_registry :unregister (channel_interceptor , <<" dummy interceptor priority 3" >>),
307+
308+ # 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = QName }),
309+ passed .
310+
216311check_send_receive (Ch1 , QName , Send , Receive ) ->
217312 amqp_channel :call (Ch1 ,
218313 # 'basic.publish' {routing_key = QName },
0 commit comments