diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 6fac3e286f88..f71136b667ff 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1211,13 +1211,14 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)}, "link handle value (~b) exceeds maximum link handle value (~b)", [Handle, MaxHandle]); handle_frame(#'v1_0.attach'{name = {utf8, NameBin} = Name, - handle = Handle, + handle = ?UINT(HandleInt) = Handle, role = Role, source = Source, target = Target, snd_settle_mode = SndSettleMode, rcv_settle_mode = RcvSettleMode} = Attach, State) -> + ok = validate_handle_not_in_use(HandleInt, State), try ok = validate_attach(Attach), handle_attach(Attach, State) @@ -1506,10 +1507,6 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, Flow = #'v1_0.flow'{handle = Handle, delivery_count = DeliveryCount, link_credit = ?UINT(MaxLinkCredit)}, - %%TODO check that handle is not in use for any other open links. - %%"The handle MUST NOT be used for other open links. An attempt to attach - %% using a handle which is already associated with a link MUST be responded to - %% with an immediate close carrying a handle-in-use session-error." IncomingLinks = IncomingLinks0#{HandleInt => IncomingLink}, State = State0#state{incoming_links = IncomingLinks, permission_cache = PermCache}, @@ -3424,14 +3421,30 @@ validate_attach(#'v1_0.attach'{unsettled = {map, [_|_]}}) -> exit_not_implemented("Link recovery not supported"); validate_attach(#'v1_0.attach'{incomplete_unsettled = true}) -> exit_not_implemented("Link recovery not supported"); -validate_attach( - #'v1_0.attach'{snd_settle_mode = SndSettleMode, - rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_SECOND}) +validate_attach(#'v1_0.attach'{snd_settle_mode = SndSettleMode, + rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_SECOND}) when SndSettleMode =/= ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> exit_not_implemented("rcv-settle-mode second not supported"); validate_attach(#'v1_0.attach'{}) -> ok. +%% "The handle MUST NOT be used for other open links. An attempt to attach +%% using a handle which is already associated with a link MUST be responded +%% to with an immediate close carrying a handle-in-use session-error." [2.7.3] +validate_handle_not_in_use(Handle, #state{incoming_links = IL, + outgoing_links = OL, + incoming_management_links = IML, + outgoing_management_links = OML}) + when is_map_key(Handle, IL) orelse + is_map_key(Handle, OL) orelse + is_map_key(Handle, IML) orelse + is_map_key(Handle, OML) -> + protocol_error(?V_1_0_SESSION_ERROR_HANDLE_IN_USE, + "handle ~b is already associated with a link", + [Handle]); +validate_handle_not_in_use(_, _) -> + ok. + validate_multi_transfer_delivery_id(?UINT(Id), Id) -> ok; validate_multi_transfer_delivery_id(undefined, _FirstDeliveryId) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index fecc21614c7e..029ae849975b 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -184,7 +184,8 @@ groups() -> bad_x_cc_annotation_exchange, decimal_types, consumer_timeout_quorum_queue_policy, - consumer_timeout_quorum_queue_consumer_arg + consumer_timeout_quorum_queue_consumer_arg, + handle_in_use ]}, {cluster_size_3, [shuffle], @@ -5343,6 +5344,38 @@ attach_to_exclusive_queue(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). +%% "The handle MUST NOT be used for other open links. An attempt to attach +%% using a handle which is already associated with a link MUST be responded +%% to with an immediate close carrying a handle-in-use session-error." [2.7.3] +handle_in_use(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"my key">>), + + Handle = 99, + SenderArgs1 = #{name => <<"sender-1">>, + role => {sender, #{address => Address}}, + snd_settle_mode => mixed, + rcv_settle_mode => first, + handle => Handle}, + {ok, Sender1} = amqp10_client:attach_link(Session, SenderArgs1), + ok = wait_for_credit(Sender1), + + %% Attaching another link with the same handle should close the session. + SenderArgs2 = SenderArgs1#{name := <<"sender-2">>}, + {ok, _Sender2} = amqp10_client:attach_link(Session, SenderArgs2), + receive {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_SESSION_ERROR_HANDLE_IN_USE, + description = {utf8, <<"handle 99 is already associated with a link">>} + }}}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + dynamic_target_short_link_name(Config) -> OpnConf0 = connection_config(Config), OpnConf = OpnConf0#{container_id := <<"my-container">>,