@@ -214,9 +214,6 @@ process_connect(
214214 {TraceState , ConnName } = init_trace (VHost , ConnName0 ),
215215 ok = rabbit_mqtt_keepalive :start (KeepaliveSecs , Socket ),
216216 Exchange = rabbit_misc :r (VHost , exchange , persistent_term :get (? PERSISTENT_TERM_EXCHANGE )),
217- % % To simplify logic, we decide at connection establishment time to stick
218- % % with either binding args v1 or v2 for the lifetime of the connection.
219- BindingArgsV2 = rabbit_feature_flags :is_enabled ('rabbitmq_4.1.0' ),
220217 ProtoVerAtom = proto_integer_to_atom (ProtoVer ),
221218 MsgIcptCtx = #{protocol => ProtoVerAtom ,
222219 vhost => VHost ,
@@ -246,7 +243,6 @@ process_connect(
246243 will_msg = WillMsg ,
247244 max_packet_size_outbound = MaxPacketSize ,
248245 topic_alias_maximum_outbound = TopicAliasMaxOutbound ,
249- binding_args_v2 = BindingArgsV2 ,
250246 msg_interceptor_ctx = MsgIcptCtx },
251247 auth_state = # auth_state {
252248 user = User ,
@@ -459,8 +455,7 @@ process_request(?SUBSCRIBE,
459455 packet_id = SubscribePktId ,
460456 subscriptions = Subscriptions },
461457 payload = undefined },
462- State0 = # state {cfg = # cfg {proto_ver = ProtoVer ,
463- binding_args_v2 = BindingArgsV2 }}) ->
458+ State0 = # state {cfg = # cfg {proto_ver = ProtoVer }}) ->
464459 ? LOG_DEBUG (" Received a SUBSCRIBE with subscription(s) ~p " , [Subscriptions ]),
465460 DisconnectOnUnauthorized = rabbit_mqtt_util :env (disconnect_on_unauthorized ),
466461 {ResultRev , RetainedRev , State1 } =
@@ -491,8 +486,7 @@ process_request(?SUBSCRIBE,
491486 maybe
492487 {ok , Q } ?= ensure_queue (QoS , S0 ),
493488 QName = amqqueue :get_name (Q ),
494- BindingArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter ,
495- Opts , BindingArgsV2 ),
489+ BindingArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter , Opts ),
496490 ok ?= add_subscription (TopicFilter , BindingArgs , QName , S0 ),
497491 ok ?= maybe_delete_old_subscription (TopicFilter , Opts , S0 ),
498492 Subs = maps :put (TopicFilter , Opts , S0 # state .subscriptions ),
@@ -541,11 +535,10 @@ process_request(?UNSUBSCRIBE,
541535 {ReasonCodes , State } =
542536 lists :foldl (
543537 fun (TopicFilter , {L , # state {subscriptions = Subs0 ,
544- cfg = # cfg {proto_ver = ProtoVer ,
545- binding_args_v2 = BindingArgsV2 }} = S0 }) ->
538+ cfg = # cfg {proto_ver = ProtoVer }} = S0 }) ->
546539 case maps :take (TopicFilter , Subs0 ) of
547540 {Opts , Subs } ->
548- BindingArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter , Opts , BindingArgsV2 ),
541+ BindingArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter , Opts ),
549542 case delete_subscription (
550543 TopicFilter , BindingArgs , Opts # mqtt_subscription_opts .qos , S0 ) of
551544 ok ->
@@ -913,8 +906,7 @@ init_subscriptions(_, State) ->
913906-spec init_subscriptions0 (qos (), state ()) ->
914907 {ok , subscriptions ()} | {error , reason_code ()}.
915908init_subscriptions0 (QoS , State = # state {cfg = # cfg {proto_ver = ProtoVer ,
916- exchange = Exchange ,
917- binding_args_v2 = BindingArgsV2 }}) ->
909+ exchange = Exchange }}) ->
918910 Bindings =
919911 rabbit_binding :list_for_source_and_destination (
920912 Exchange ,
@@ -934,7 +926,7 @@ init_subscriptions0(QoS, State = #state{cfg = #cfg{proto_ver = ProtoVer,
934926 case ProtoVer of
935927 ? MQTT_PROTO_V5 ->
936928 % % session upgrade
937- NewBindingArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter , Opts , BindingArgsV2 ),
929+ NewBindingArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter , Opts ),
938930 ok = recreate_subscription (TopicFilter , Args , NewBindingArgs , QoS , State );
939931 _ ->
940932 ok
@@ -953,19 +945,13 @@ init_subscriptions0(QoS, State = #state{cfg = #cfg{proto_ver = ProtoVer,
953945 % % binding args v1
954946 Opts0 = # mqtt_subscription_opts {} = lists :keyfind (
955947 mqtt_subscription_opts , 1 , Args ),
956- case BindingArgsV2 of
957- true ->
958- % % Migrate v1 to v2.
959- % % Note that this migration must be in place even for some versions
960- % % (jump upgrade) after feature flag 'rabbitmq_4.1.0' has become
961- % % required since enabling the feature flag doesn't migrate binding
962- % % args for existing connections.
963- NewArgs = binding_args_for_proto_ver (
964- ProtoVer , TopicFilter , Opts0 , BindingArgsV2 ),
965- ok = recreate_subscription (TopicFilter , Args , NewArgs , QoS , State );
966- false ->
967- ok
968- end ,
948+ % % Migrate v1 to v2.
949+ % % Note that this migration must be in place even for some versions
950+ % % (jump upgrade) after feature flag 'rabbitmq_4.1.0' has become
951+ % % required since enabling the feature flag doesn't migrate binding
952+ % % args for existing connections.
953+ NewArgs = binding_args_for_proto_ver (ProtoVer , TopicFilter , Opts0 ),
954+ ok = recreate_subscription (TopicFilter , Args , NewArgs , QoS , State ),
969955 Opts0
970956 end ;
971957 _ ->
@@ -1000,10 +986,11 @@ recreate_subscription(TopicFilter, OldBindingArgs, NewBindingArgs, Qos, State) -
1000986-spec hand_off_to_retainer (pid (), topic (), mqtt_msg ()) -> ok .
1001987hand_off_to_retainer (RetainerPid , Topic0 , Msg = # mqtt_msg {payload = Payload }) ->
1002988 Topic = amqp_to_mqtt (Topic0 ),
1003- if Payload =:= <<>> ->
1004- rabbit_mqtt_retainer :clear (RetainerPid , Topic );
1005- true ->
1006- rabbit_mqtt_retainer :retain (RetainerPid , Topic , Msg )
989+ case Payload of
990+ <<>> ->
991+ rabbit_mqtt_retainer :clear (RetainerPid , Topic );
992+ _ ->
993+ rabbit_mqtt_retainer :retain (RetainerPid , Topic , Msg )
1007994 end .
1008995
1009996-spec send_retained_messages ([{topic_filter (), qos ()}], state ()) -> state ().
@@ -1538,18 +1525,13 @@ consume(Q, QoS, #state{
15381525 Err
15391526 end .
15401527
1541- binding_args_for_proto_ver (? MQTT_PROTO_V3 , _ , _ , _ ) ->
1528+ binding_args_for_proto_ver (? MQTT_PROTO_V3 , _ , _ ) ->
15421529 [];
1543- binding_args_for_proto_ver (? MQTT_PROTO_V4 , _ , _ , _ ) ->
1530+ binding_args_for_proto_ver (? MQTT_PROTO_V4 , _ , _ ) ->
15441531 [];
1545- binding_args_for_proto_ver (? MQTT_PROTO_V5 , TopicFilter , SubOpts0 , V2 ) ->
1546- SubOpts = case V2 of
1547- true ->
1548- Table = subscription_opts_to_table (SubOpts0 ),
1549- {<<" x-mqtt-subscription-opts" >>, table , Table };
1550- false ->
1551- SubOpts0
1552- end ,
1532+ binding_args_for_proto_ver (? MQTT_PROTO_V5 , TopicFilter , SubOpts0 ) ->
1533+ Table = subscription_opts_to_table (SubOpts0 ),
1534+ SubOpts = {<<" x-mqtt-subscription-opts" >>, table , Table },
15531535 BindingKey = mqtt_to_amqp (TopicFilter ),
15541536 [SubOpts , {<<" x-binding-key" >>, longstr , BindingKey }].
15551537
@@ -1600,13 +1582,12 @@ delete_subscription(TopicFilter, BindingArgs, Qos, State) ->
16001582% % Subscription will be identical to that in the previous Subscription, although its
16011583% % Subscription Options could be different." [v5 3.8.4]
16021584maybe_delete_old_subscription (TopicFilter , Opts , State = # state {subscriptions = Subs ,
1603- cfg = # cfg {proto_ver = ProtoVer ,
1604- binding_args_v2 = BindingArgsV2 }}) ->
1585+ cfg = # cfg {proto_ver = ProtoVer }}) ->
16051586 case Subs of
16061587 #{TopicFilter := OldOpts }
16071588 when OldOpts =/= Opts ->
16081589 delete_subscription (TopicFilter ,
1609- binding_args_for_proto_ver (ProtoVer , TopicFilter , OldOpts , BindingArgsV2 ),
1590+ binding_args_for_proto_ver (ProtoVer , TopicFilter , OldOpts ),
16101591 OldOpts # mqtt_subscription_opts .qos ,
16111592 State );
16121593 _ ->
0 commit comments