|
13 | 13 |
|
14 | 14 | all() -> |
15 | 15 | [ |
16 | | - {group, non_parallel_tests} |
| 16 | + {group, tests} |
17 | 17 | ]. |
18 | 18 |
|
19 | 19 | groups() -> |
20 | 20 | [ |
21 | | - {non_parallel_tests, [], [ |
22 | | - routed_to_zero_queue_test, |
23 | | - routed_to_one_queue_test, |
24 | | - routed_to_many_queue_test |
25 | | - ]} |
| 21 | + {tests, [], |
| 22 | + [ |
| 23 | + routed_to_zero_queue_test, |
| 24 | + routed_to_one_queue_test, |
| 25 | + routed_to_many_queue_test, |
| 26 | + stable_routing_across_restarts_test |
| 27 | + ]} |
26 | 28 | ]. |
27 | 29 |
|
28 | 30 | %% ------------------------------------------------------------------- |
@@ -63,84 +65,142 @@ end_per_testcase(Testcase, Config) -> |
63 | 65 | %% ------------------------------------------------------------------- |
64 | 66 |
|
65 | 67 | routed_to_zero_queue_test(Config) -> |
66 | | - test0(Config, fun () -> |
67 | | - #'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()} |
68 | | - end, |
69 | | - fun() -> |
70 | | - #amqp_msg{props = #'P_basic'{}, payload = <<>>} |
71 | | - end, [], 5, 0), |
72 | | - |
73 | | - passed. |
| 68 | + ok = route(Config, [], 5, 0). |
74 | 69 |
|
75 | 70 | routed_to_one_queue_test(Config) -> |
76 | | - test0(Config, fun () -> |
77 | | - #'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()} |
78 | | - end, |
79 | | - fun() -> |
80 | | - #amqp_msg{props = #'P_basic'{}, payload = <<>>} |
81 | | - end, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1), |
82 | | - |
83 | | - passed. |
| 71 | + ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1). |
84 | 72 |
|
85 | 73 | routed_to_many_queue_test(Config) -> |
86 | | - test0(Config, fun () -> |
87 | | - #'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()} |
88 | | - end, |
89 | | - fun() -> |
90 | | - #amqp_msg{props = #'P_basic'{}, payload = <<>>} |
91 | | - end, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5), |
92 | | - |
93 | | - passed. |
94 | | - |
95 | | -test0(Config, MakeMethod, MakeMsg, Queues, MsgCount, Count) -> |
96 | | - {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), |
97 | | - E = make_exchange_name(Config, "0"), |
98 | | - |
99 | | - #'exchange.declare_ok'{} = |
100 | | - amqp_channel:call(Chan, |
101 | | - #'exchange.declare' { |
102 | | - exchange = E, |
103 | | - type = <<"x-modulus-hash">>, |
104 | | - auto_delete = true |
105 | | - }), |
106 | | - [#'queue.declare_ok'{} = |
107 | | - amqp_channel:call(Chan, #'queue.declare' { |
108 | | - queue = Q, exclusive = true }) || Q <- Queues], |
109 | | - [#'queue.bind_ok'{} = |
110 | | - amqp_channel:call(Chan, #'queue.bind'{queue = Q, |
111 | | - exchange = E, |
112 | | - routing_key = <<"">>}) |
| 74 | + ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5). |
| 75 | + |
| 76 | +route(Config, Queues, PublishCount, ExpectedRoutedCount) -> |
| 77 | + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config), |
| 78 | + B = rabbit_ct_helpers:get_config(Config, test_resource_name), |
| 79 | + XNameBin = erlang:list_to_binary("x-" ++ B), |
| 80 | + |
| 81 | + #'exchange.declare_ok'{} = amqp_channel:call(Chan, |
| 82 | + #'exchange.declare'{ |
| 83 | + exchange = XNameBin, |
| 84 | + type = <<"x-modulus-hash">>, |
| 85 | + durable = true, |
| 86 | + auto_delete = true}), |
| 87 | + [begin |
| 88 | + #'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{ |
| 89 | + queue = Q, |
| 90 | + durable = true, |
| 91 | + exclusive = true}), |
| 92 | + #'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{ |
| 93 | + queue = Q, |
| 94 | + exchange = XNameBin}) |
| 95 | + end |
113 | 96 | || Q <- Queues], |
114 | 97 |
|
115 | 98 | amqp_channel:call(Chan, #'confirm.select'{}), |
116 | | - |
117 | 99 | [amqp_channel:call(Chan, |
118 | | - MakeMethod(), |
119 | | - MakeMsg()) || _ <- lists:duplicate(MsgCount, const)], |
120 | | - |
121 | | - % ensure that the messages have been delivered to the queues before asking |
122 | | - % for the message count |
| 100 | + #'basic.publish'{exchange = XNameBin, |
| 101 | + routing_key = rnd()}, |
| 102 | + #amqp_msg{props = #'P_basic'{}, |
| 103 | + payload = <<>>}) || |
| 104 | + _ <- lists:duplicate(PublishCount, const)], |
123 | 105 | amqp_channel:wait_for_confirms_or_die(Chan), |
124 | 106 |
|
125 | | - Counts = |
126 | | - [begin |
127 | | - #'queue.declare_ok'{message_count = M} = |
128 | | - amqp_channel:call(Chan, #'queue.declare' {queue = Q, |
129 | | - exclusive = true }), |
130 | | - M |
131 | | - end || Q <- Queues], |
132 | | - |
133 | | - ?assertEqual(Count, lists:sum(Counts)), |
134 | | - |
135 | | - amqp_channel:call(Chan, #'exchange.delete' { exchange = E }), |
136 | | - [amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues], |
| 107 | + Count = lists:foldl( |
| 108 | + fun(Q, Acc) -> |
| 109 | + #'queue.declare_ok'{message_count = M} = amqp_channel:call( |
| 110 | + Chan, |
| 111 | + #'queue.declare'{ |
| 112 | + queue = Q, |
| 113 | + durable = true, |
| 114 | + exclusive = true}), |
| 115 | + Acc + M |
| 116 | + end, 0, Queues), |
| 117 | + ?assertEqual(ExpectedRoutedCount, Count), |
| 118 | + |
| 119 | + amqp_channel:call(Chan, #'exchange.delete'{exchange = XNameBin}), |
| 120 | + [amqp_channel:call(Chan, #'queue.delete'{queue = Q}) || Q <- Queues], |
| 121 | + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan). |
| 122 | + |
| 123 | +stable_routing_across_restarts_test(Config) -> |
| 124 | + {Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config), |
| 125 | + XNameBin = atom_to_binary(?FUNCTION_NAME), |
| 126 | + NumQs = 40, |
| 127 | + NumMsgs = 500, |
| 128 | + |
| 129 | + #'exchange.declare_ok'{} = amqp_channel:call(Chan1, |
| 130 | + #'exchange.declare'{ |
| 131 | + exchange = XNameBin, |
| 132 | + type = <<"x-modulus-hash">>, |
| 133 | + durable = true}), |
| 134 | + Queues = [erlang:list_to_binary("q-" ++ integer_to_list(I)) || I <- lists:seq(1, NumQs)], |
| 135 | + [begin |
| 136 | + #'queue.declare_ok'{} = amqp_channel:call(Chan1, #'queue.declare'{ |
| 137 | + queue = Q, |
| 138 | + durable = true}), |
| 139 | + #'queue.bind_ok'{} = amqp_channel:call(Chan1, #'queue.bind'{ |
| 140 | + queue = Q, |
| 141 | + exchange = XNameBin, |
| 142 | + %% The routing key shouldn't matter |
| 143 | + routing_key = rnd()}) |
| 144 | + end |
| 145 | + || Q <- Queues], |
137 | 146 |
|
138 | | - rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), |
139 | | - ok. |
| 147 | + RoutingKeys = [rnd() || _ <- lists:seq(1, NumMsgs)], |
| 148 | + |
| 149 | + amqp_channel:call(Chan1, #'confirm.select'{}), |
| 150 | + [amqp_channel:call(Chan1, |
| 151 | + #'basic.publish'{exchange = XNameBin, |
| 152 | + routing_key = RK}, |
| 153 | + #amqp_msg{payload = RK}) |
| 154 | + || RK <- RoutingKeys], |
| 155 | + amqp_channel:wait_for_confirms_or_die(Chan1), |
| 156 | + |
| 157 | + Map1 = consume_all(Chan1, Queues), |
| 158 | + |
| 159 | + %% Assert at least two queues got messages |
| 160 | + NonEmptyQueues1 = maps:filter(fun(_Q, Msgs) -> length(Msgs) > 0 end, Map1), |
| 161 | + ?assert(maps:size(NonEmptyQueues1) >= 2), |
| 162 | + |
| 163 | + %% Assert all messages routed |
| 164 | + ?assertEqual(NumMsgs, lists:sum([length(Msgs) || Msgs <- maps:values(Map1)])), |
| 165 | + |
| 166 | + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1), |
| 167 | + %% Restart node |
| 168 | + ok = rabbit_ct_broker_helpers:restart_node(Config, 0), |
| 169 | + {Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config), |
| 170 | + |
| 171 | + amqp_channel:call(Chan2, #'confirm.select'{}), |
| 172 | + %% Publish the same messages again. |
| 173 | + [amqp_channel:call(Chan2, |
| 174 | + #'basic.publish'{exchange = XNameBin, |
| 175 | + routing_key = RK}, |
| 176 | + #amqp_msg{payload = RK}) |
| 177 | + || RK <- RoutingKeys], |
| 178 | + amqp_channel:wait_for_confirms_or_die(Chan2), |
| 179 | + |
| 180 | + Map2 = consume_all(Chan2, Queues), |
| 181 | + |
| 182 | + %% Assert the same messages ended up in the same queues, |
| 183 | + %% i.e. that routing was stable. |
| 184 | + ?assertEqual(Map1, Map2), |
| 185 | + |
| 186 | + amqp_channel:call(Chan2, #'exchange.delete'{exchange = XNameBin}), |
| 187 | + [amqp_channel:call(Chan2, #'queue.delete'{queue = Q}) || Q <- Queues], |
| 188 | + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2). |
| 189 | + |
| 190 | +consume_all(Chan, Queues) -> |
| 191 | + lists:foldl(fun(Q, Map) -> |
| 192 | + Msgs = consume_queue(Chan, Q, []), |
| 193 | + maps:put(Q, Msgs, Map) |
| 194 | + end, #{}, Queues). |
| 195 | + |
| 196 | +consume_queue(Chan, Q, L) -> |
| 197 | + case amqp_channel:call(Chan, #'basic.get'{queue = Q, |
| 198 | + no_ack = true}) of |
| 199 | + #'basic.get_empty'{} -> |
| 200 | + L; |
| 201 | + {#'basic.get_ok'{}, #amqp_msg{payload = Payload}} -> |
| 202 | + consume_queue(Chan, Q, L ++ [Payload]) |
| 203 | + end. |
140 | 204 |
|
141 | 205 | rnd() -> |
142 | | - list_to_binary(integer_to_list(rand:uniform(1000000))). |
143 | | - |
144 | | -make_exchange_name(Config, Suffix) -> |
145 | | - B = rabbit_ct_helpers:get_config(Config, test_resource_name), |
146 | | - erlang:list_to_binary("x-" ++ B ++ "-" ++ Suffix). |
| 206 | + integer_to_binary(rand:uniform(10_000_000)). |
0 commit comments