Skip to content

Commit 7768f99

Browse files
authored
Fix dlx checkout command redirection (#16203)
This commit supersedes #15548. ## What? Fix the following genuine CI flake: ``` make -C deps/rabbit ct-rabbit_fifo_dlx_integration t=cluster_size_3:single_dlx_worker ``` Sometimes this test case failed with the logs showing the following: ```text 2026-02-24 09:06:19.413770+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': vote granted for term 3 votes 2 2026-02-24 09:06:19.414048+00:00 [debug] <0.2377.0> started rabbit_fifo_dlx_worker <0.2600.0> for queue 'single_dlx_worker_source' in vhost '/' 2026-02-24 09:06:19.414096+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': candidate -> leader in term: 3 machine version: 7, last applied 5 2026-02-24 09:06:19.414388+00:00 [debug] <0.2602.0> queue 'single_dlx_worker_source' in vhost '/': updating leader record to current node rmq-ct-cluster_size_3-1-28000@localhost 2026-02-24 09:06:19.414279+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader saw request_vote_rpc from {'%2F_single�[118;1:3u_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4 abdicates term: 3! 2026-02-24 09:06:19.417479+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader -> follower in term: 4 machine version: 7, last applied 5 2026-02-24 09:06:19.417533+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': is not new, setting election timeout. 2026-02-24 09:06:19.417740+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': declining vote for {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4, candidate last log {index, term} was: {5,2} last log entry {index, term} is: {{6,3}} 2026-02-24 09:06:19.417824+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader call - leader not known. Command will be forwarded once leader is known. 2026-02-24 09:06:19.418190+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/' declining pre-vote to {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-2-28072@localhost'} for term 3, current term 4 2026-02-24 09:06:19.428043+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': resetting last index to 5 from 6 in term 4 2026-02-24 09:06:19.428157+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': detected a new leader {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} in term 4 2026-02-24 09:06:19.428280+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': mem table overwriting detected whilst staging entries, opening new mem table 2026-02-24 09:06:19.436299+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': enabling ra cluster changes in 4, index 6 2026-02-24 09:06:19.436411+00:00 [debug] <0.2377.0> Terminating <31028.1894.0> since <31122.2516.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437003+00:00 [debug] <0.2377.0> Terminating <31122.2516.0> since <0.2600.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> Failed to process command {dlx,{checkout,<0.2600.0>,2}} on quorum queue leader {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-1-28000@localhost'} because actual leader is {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-3-28144@localhost'}. ``` ## How? This commit supersedes #15548. In this commit, we use `ra:pipeline_command/4` with a selective receive instead of `ra:process_command/3` for the dlx checkout command. This prevents Ra from automatically redirecting the checkout command to a new leader if a failover happens while the command is being processed.
1 parent 38f9639 commit 7768f99

2 files changed

Lines changed: 61 additions & 42 deletions

File tree

deps/rabbit/src/rabbit_fifo_dlx_client.erl

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,38 @@ checkout(QResource, Leader, NumUnsettled) ->
4141
State = #state{queue_resource = QResource,
4242
leader = Leader,
4343
last_msg_id = -1},
44-
process_command(Cmd, State, 5).
44+
checkout0(Cmd, State, 5).
4545

46-
process_command(_Cmd, _State, 0) ->
46+
checkout0(_Cmd, _State, 0) ->
4747
{error, ra_command_failed};
48-
process_command(Cmd, #state{leader = Leader} = State, Tries) ->
49-
case ra:process_command(Leader, Cmd, 60_000) of
50-
{ok, ok, Leader} ->
51-
{ok, State#state{leader = Leader}};
52-
{ok, ok, NonLocalLeader} ->
53-
?LOG_WARNING("Failed to process command ~tp on queue leader ~tp because actual leader is ~tp.",
54-
[Cmd, Leader, NonLocalLeader]),
55-
{error, non_local_leader};
56-
Err ->
57-
?LOG_WARNING("Failed to process command ~tp on queue leader ~tp: ~tp~n"
58-
"Trying ~b more time(s)...",
59-
[Cmd, Leader, Err, Tries]),
60-
process_command(Cmd, State, Tries - 1)
48+
checkout0(Cmd, #state{leader = Leader} = State, Tries) ->
49+
Correlation = make_ref(),
50+
%% We use ra:pipeline_command/4 instead of ra:process_command/3 because the
51+
%% latter internally redirects to the new leader which we don't want.
52+
ra:pipeline_command(Leader, Cmd, Correlation, normal),
53+
receive_applied(Cmd, Correlation, State, Tries).
54+
55+
receive_applied(Cmd, Corr, #state{queue_resource = QName,
56+
leader = Leader} = State, Tries) ->
57+
receive
58+
{'$gen_cast', {queue_event, QName, {Leader, {applied, Results}}}} ->
59+
case lists:member({Corr, ok}, Results) of
60+
true ->
61+
{ok, State};
62+
false ->
63+
receive_applied(Cmd, Corr, State, Tries)
64+
end;
65+
{'$gen_cast', {queue_event, QName,
66+
{_From, {rejected, {not_leader, NonLocalLeader, Corr}}}}} ->
67+
?LOG_WARNING("failed to apply command ~tp on leader ~tp "
68+
"because actual leader is ~tp",
69+
[Cmd, Leader, NonLocalLeader]),
70+
{error, non_local_leader}
71+
after 60_000 ->
72+
?LOG_WARNING("timed out applying command ~tp on leader ~tp; "
73+
"trying ~b more time(s)...",
74+
[Cmd, Leader, Tries - 1]),
75+
checkout0(Cmd, State, Tries - 1)
6176
end.
6277

6378
-spec handle_ra_event(pid(), term(), state()) ->

deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,35 +38,37 @@
3838

3939
all() ->
4040
[
41-
{group, single_node},
41+
{group, cluster_size_1},
4242
{group, cluster_size_3}
4343
].
4444

4545
groups() ->
4646
[
47-
{single_node, [shuffle], [
48-
expired,
49-
rejected,
50-
delivery_limit,
51-
target_queue_not_bound,
52-
target_queue_deleted,
53-
dlx_missing,
54-
cycle,
55-
stats,
56-
drop_head_falls_back_to_at_most_once,
57-
switch_strategy,
58-
reject_publish_source_queue_max_length,
59-
reject_publish_source_queue_max_length_bytes,
60-
reject_publish_target_classic_queue,
61-
reject_publish_max_length_target_quorum_queue,
62-
target_quorum_queue_delete_create
63-
]},
64-
{cluster_size_3, [], [
65-
reject_publish_max_length_target_quorum_queue,
66-
reject_publish_down_target_quorum_queue,
67-
many_target_queues,
68-
single_dlx_worker
69-
]}
47+
{cluster_size_1, [shuffle],
48+
[
49+
expired,
50+
rejected,
51+
delivery_limit,
52+
target_queue_not_bound,
53+
target_queue_deleted,
54+
dlx_missing,
55+
cycle,
56+
stats,
57+
drop_head_falls_back_to_at_most_once,
58+
switch_strategy,
59+
reject_publish_source_queue_max_length,
60+
reject_publish_source_queue_max_length_bytes,
61+
reject_publish_target_classic_queue,
62+
reject_publish_max_length_target_quorum_queue,
63+
target_quorum_queue_delete_create
64+
]},
65+
{cluster_size_3, [],
66+
[
67+
reject_publish_max_length_target_quorum_queue,
68+
reject_publish_down_target_quorum_queue,
69+
many_target_queues,
70+
single_dlx_worker
71+
]}
7072
].
7173

7274
init_per_suite(Config0) ->
@@ -86,7 +88,7 @@ init_per_suite(Config0) ->
8688
end_per_suite(Config) ->
8789
rabbit_ct_helpers:run_teardown_steps(Config).
8890

89-
init_per_group(single_node = Group, Config) ->
91+
init_per_group(cluster_size_1 = Group, Config) ->
9092
init_per_group(Group, Config, 1);
9193
init_per_group(cluster_size_3 = Group, Config) ->
9294
init_per_group(Group, Config, 3).
@@ -954,15 +956,17 @@ single_dlx_worker(Config) ->
954956
ok = rabbit_ct_broker_helpers:kill_node(Config, Leader0),
955957
{ok, _, {_, Leader1}} = ?awaitMatch({ok, _, _},
956958
ra:members({RaName, Follower0}),
957-
30000),
959+
30_000),
958960
?assertNotEqual(Leader0, Leader1),
959961
[Follower1] = [Server1, Follower0] -- [Leader1],
960962
assert_active_dlx_workers(0, Config, Follower1),
961963
assert_active_dlx_workers(1, Config, Leader1),
962964
ok = rabbit_ct_broker_helpers:start_node(Config, Leader0).
963965

964966
assert_active_dlx_workers(N, Config, Server) ->
965-
?awaitMatch(N, length(rpc(Config, Server, supervisor, which_children, [rabbit_fifo_dlx_sup], 2000)), 60000).
967+
?awaitMatch(N,
968+
length(rpc(Config, Server, supervisor, which_children, [rabbit_fifo_dlx_sup], 5000)),
969+
60_000).
966970

967971
declare_queue(Channel, Queue, Args) ->
968972
#'queue.declare_ok'{} = amqp_channel:call(Channel, #'queue.declare'{

0 commit comments

Comments
 (0)