Skip to content

Commit 2a9ba19

Browse files
committed
Remove feature flag compat code for rabbitmq_4.2.0
1 parent c555b5b commit 2a9ba19

9 files changed

Lines changed: 28 additions & 297 deletions

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2822,14 +2822,12 @@ ensure_source(#'v1_0.source'{
28222822
} = Source0,
28232823
SndSettled, LinkName, Vhost, User, ContainerId,
28242824
ConnPid, PermCache0, TopicPermCache) ->
2825-
FFEnabled = rabbit_volatile_queue:ff_enabled(),
28262825
case maps:from_keys(Caps, true) of
28272826
#{{symbol, ?CAP_VOLATILE_QUEUE} := true}
28282827
when (Durable =:= undefined orelse Durable =:= ?V_1_0_TERMINUS_DURABILITY_NONE) andalso
28292828
ExpiryPolicy =:= ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH andalso
28302829
(Timeout =:= undefined orelse Timeout =:= {uint, 0}) andalso
2831-
SndSettled andalso
2832-
FFEnabled ->
2830+
SndSettled ->
28332831
%% create volatile queue
28342832
QNameBin = rabbit_volatile_queue:new_name(),
28352833
Source = #'v1_0.source'{

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
format_state/1, format_message_queue/2]).
6464

6565
%% Internal
66-
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
66+
-export([list_local/0, emit_info_local/3]).
6767
-export([get_vhost/1, get_user/1]).
6868
%% For testing
6969
-export([build_topic_variable_map/3]).
@@ -298,14 +298,6 @@ shutdown(Pid) ->
298298
send_command(Pid, Msg) ->
299299
gen_server2:cast(Pid, {command, Msg}).
300300

301-
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
302-
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
303-
deliver_reply_local(Pid, Key, Message) ->
304-
case pg_local:in_group(rabbit_channels, Pid) of
305-
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
306-
false -> ok
307-
end.
308-
309301
-spec list() -> [pid()].
310302

311303
list() ->
@@ -561,20 +553,6 @@ handle_call({has_state, #resource{virtual_host = Vhost,
561553
reply(true, State);
562554
handle_call({has_state, _QName, _QType}, _From, State) ->
563555
reply(false, State);
564-
%% Delete below clause when feature flag rabbitmq_4.2.0 becomes required.
565-
handle_call({declare_fast_reply_to, Key}, _From, State = #ch{direct_reply = Reply}) ->
566-
Result = case Reply of
567-
none ->
568-
not_found;
569-
#direct_reply{queue = QNameBin} ->
570-
case rabbit_volatile_queue:key_from_name(QNameBin) of
571-
{ok, Key} ->
572-
exists;
573-
_ ->
574-
not_found
575-
end
576-
end,
577-
reply(Result, State);
578556

579557
handle_call(refresh_config, _From,
580558
State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) ->
@@ -647,30 +625,6 @@ handle_cast({command, Msg}, State) ->
647625
ok = send(Msg, State),
648626
noreply(State);
649627

650-
%% Delete below clause when feature flag rabbitmq_4.2.0 becomes required.
651-
handle_cast({deliver_reply, Key, Mc},
652-
#ch{cfg = #conf{state = ChanState,
653-
writer_pid = WriterPid,
654-
msg_interceptor_ctx = MsgIcptCtx},
655-
next_tag = DeliveryTag,
656-
direct_reply = #direct_reply{consumer_tag = Ctag,
657-
queue = QNameBin}} = State)
658-
when ChanState =/= closing ->
659-
case rabbit_volatile_queue:key_from_name(QNameBin) of
660-
{ok, Key} ->
661-
ExchName = mc:exchange(Mc),
662-
[RoutingKey | _] = mc:routing_keys(Mc),
663-
Deliver = #'basic.deliver'{consumer_tag = Ctag,
664-
delivery_tag = DeliveryTag,
665-
redelivered = false,
666-
exchange = ExchName,
667-
routing_key = RoutingKey},
668-
Content = outgoing_content(Mc, MsgIcptCtx),
669-
ok = rabbit_writer:send_command(WriterPid, Deliver, Content);
670-
_ ->
671-
ok
672-
end,
673-
noreply(State);
674628
handle_cast({deliver_reply, _, _}, State) ->
675629
noreply(State);
676630

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
new_target/1,
2121
new_name/0,
2222
is/1,
23-
key_from_name/1,
2423
pid_from_name/2,
2524
exists/1,
26-
ff_enabled/0,
2725
local_cast/2,
2826
local_call/2]).
2927

@@ -159,25 +157,10 @@ exists(#resource{kind = queue,
159157
name = QNameBin} = QName) ->
160158
case pid_from_name(QNameBin) of
161159
{ok, Pid} when is_pid(Pid) ->
162-
case ff_enabled() of
163-
true ->
164-
Request = {has_state, QName, ?MODULE},
165-
MFA = {?MODULE, local_call, [Request]},
166-
try delegate:invoke(Pid, MFA)
167-
catch _:_ -> false
168-
end;
169-
false ->
170-
case key_from_name(QNameBin) of
171-
{ok, Key} ->
172-
Msg = {declare_fast_reply_to, Key},
173-
try gen_server:call(Pid, Msg, infinity) of
174-
exists -> true;
175-
_ -> false
176-
catch exit:_ -> false
177-
end;
178-
error ->
179-
false
180-
end
160+
Request = {has_state, QName, ?MODULE},
161+
MFA = {?MODULE, local_call, [Request]},
162+
try delegate:invoke(Pid, MFA)
163+
catch _:_ -> false
181164
end;
182165
_ ->
183166
false
@@ -201,22 +184,11 @@ deliver(Qs, Msg, #{}) ->
201184
{[], []}.
202185

203186
deliver0(Q, Msg) ->
204-
QName = amqqueue:get_name(Q),
205187
QPid = amqqueue:get_pid(Q),
206-
case ff_enabled() of
207-
true ->
208-
Request = {queue_event, QName, {deliver, Msg}},
209-
MFA = {?MODULE, local_cast, [Request]},
210-
delegate:invoke_no_result(QPid, MFA);
211-
false ->
212-
case key_from_name(QName#resource.name) of
213-
{ok, Key} ->
214-
MFA = {rabbit_channel, deliver_reply_local, [Key, Msg]},
215-
delegate:invoke_no_result(QPid, MFA);
216-
error ->
217-
ok
218-
end
219-
end.
188+
QName = amqqueue:get_name(Q),
189+
Request = {queue_event, QName, {deliver, Msg}},
190+
MFA = {?MODULE, local_cast, [Request]},
191+
delegate:invoke_no_result(QPid, MFA).
220192

221193
-spec local_cast(pid(), term()) -> ok.
222194
local_cast(Pid, Request) ->
@@ -290,9 +262,6 @@ cancel(_, _, #?STATE{} = State) ->
290262
is_enabled() ->
291263
true.
292264

293-
ff_enabled() ->
294-
rabbit_feature_flags:is_enabled('rabbitmq_4.2.0').
295-
296265
is_compatible(_, _, _) ->
297266
true.
298267

@@ -422,18 +391,5 @@ pid_from_name(<<?PREFIX, Bin/binary>>, CandidateNodes) ->
422391
pid_from_name(_, _) ->
423392
error.
424393

425-
%% Returns the base 64 encoded key.
426-
-spec key_from_name(rabbit_misc:resource_name()) ->
427-
{ok, binary()} | error.
428-
key_from_name(<<?PREFIX, Suffix/binary>>) ->
429-
case binary:split(Suffix, <<".">>) of
430-
[_Pid, Key] ->
431-
{ok, Key};
432-
_ ->
433-
error
434-
end;
435-
key_from_name(_) ->
436-
error.
437-
438394
nodes_with_hashes() ->
439395
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.

deps/rabbit/test/amqp_auth_SUITE.erl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,8 +482,6 @@ attach_source_queue_dynamic_exclusive(Config) ->
482482
ok = close_connection_sync(Connection).
483483

484484
attach_source_queue_dynamic_volatile(Config) ->
485-
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'),
486-
487485
OpnConf = connection_config(Config),
488486
{ok, Connection} = amqp10_client:open_connection(OpnConf),
489487
{ok, Session} = amqp10_client:begin_session_sync(Connection),

deps/rabbit/test/clustering_recovery_SUITE.erl

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -139,39 +139,10 @@ init_per_testcase(Testcase, Config) ->
139139
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
140140
{keep_pid_file_on_exit, true}
141141
]),
142-
Config2 = rabbit_ct_helpers:run_steps(
143-
Config1,
144-
rabbit_ct_broker_helpers:setup_steps() ++
145-
rabbit_ct_client_helpers:setup_steps()),
146-
case Config2 of
147-
_ when is_list(Config2) andalso
148-
(Testcase =:= autodelete_transient_queue_after_partition_recovery_1 orelse
149-
Testcase =:= autodelete_durable_queue_after_partition_recovery_1 orelse
150-
Testcase =:= autodelete_transient_queue_after_partition_recovery_2 orelse
151-
Testcase =:= autodelete_durable_queue_after_partition_recovery_2 orelse
152-
Testcase =:= exclusive_transient_queue_after_partition_recovery_1 orelse
153-
Testcase =:= exclusive_durable_queue_after_partition_recovery_1 orelse
154-
Testcase =:= exclusive_transient_queue_after_partition_recovery_2 orelse
155-
Testcase =:= exclusive_durable_queue_after_partition_recovery_2) ->
156-
NewEnough = ok =:= rabbit_ct_broker_helpers:enable_feature_flag(
157-
Config2, 'rabbitmq_4.2.0'),
158-
case NewEnough of
159-
true ->
160-
Config2;
161-
false ->
162-
_ = rabbit_ct_helpers:run_steps(
163-
Config2,
164-
rabbit_ct_client_helpers:teardown_steps() ++
165-
rabbit_ct_broker_helpers:teardown_steps()),
166-
rabbit_ct_helpers:testcase_finished(Config2, Testcase),
167-
{skip,
168-
"The old node does not have improvements to "
169-
"rabbit_amqqueue_process and rabbit_node_monitor"}
170-
end;
171-
_ ->
172-
%% Other testcases or failure to setup broker and client.
173-
Config2
174-
end.
142+
rabbit_ct_helpers:run_steps(
143+
Config1,
144+
rabbit_ct_broker_helpers:setup_steps() ++
145+
rabbit_ct_client_helpers:setup_steps()).
175146

176147
end_per_testcase(Testcase, Config) ->
177148
Config1 = rabbit_ct_helpers:run_steps(Config,

deps/rabbit/test/direct_reply_to_amqp_SUITE.erl

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,10 @@ init_per_group(Group, Config) ->
7070
Config1 = rabbit_ct_helpers:set_config(
7171
Config, [{rmq_nodes_count, Nodes},
7272
{rmq_nodename_suffix, Suffix}]),
73-
Config2 = rabbit_ct_helpers:run_setup_steps(
74-
Config1,
75-
rabbit_ct_broker_helpers:setup_steps() ++
76-
rabbit_ct_client_helpers:setup_steps()),
77-
case rabbit_ct_broker_helpers:enable_feature_flag(Config2, 'rabbitmq_4.2.0') of
78-
ok ->
79-
Config2;
80-
{skip, _} = Skip ->
81-
Skip
82-
end.
73+
rabbit_ct_helpers:run_setup_steps(
74+
Config1,
75+
rabbit_ct_broker_helpers:setup_steps() ++
76+
rabbit_ct_client_helpers:setup_steps()).
8377

8478
end_per_group(_Group, Config) ->
8579
rabbit_ct_helpers:run_teardown_steps(

0 commit comments

Comments
 (0)