Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1211,13 +1211,14 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)},
"link handle value (~b) exceeds maximum link handle value (~b)",
[Handle, MaxHandle]);
handle_frame(#'v1_0.attach'{name = {utf8, NameBin} = Name,
handle = Handle,
handle = ?UINT(HandleInt) = Handle,
role = Role,
source = Source,
target = Target,
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode} = Attach,
State) ->
ok = validate_handle_not_in_use(HandleInt, State),
try
ok = validate_attach(Attach),
handle_attach(Attach, State)
Expand Down Expand Up @@ -1506,10 +1507,6 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
Flow = #'v1_0.flow'{handle = Handle,
delivery_count = DeliveryCount,
link_credit = ?UINT(MaxLinkCredit)},
%%TODO check that handle is not in use for any other open links.
%%"The handle MUST NOT be used for other open links. An attempt to attach
%% using a handle which is already associated with a link MUST be responded to
%% with an immediate close carrying a handle-in-use session-error."
IncomingLinks = IncomingLinks0#{HandleInt => IncomingLink},
State = State0#state{incoming_links = IncomingLinks,
permission_cache = PermCache},
Expand Down Expand Up @@ -3424,14 +3421,30 @@ validate_attach(#'v1_0.attach'{unsettled = {map, [_|_]}}) ->
exit_not_implemented("Link recovery not supported");
validate_attach(#'v1_0.attach'{incomplete_unsettled = true}) ->
exit_not_implemented("Link recovery not supported");
validate_attach(
#'v1_0.attach'{snd_settle_mode = SndSettleMode,
rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_SECOND})
validate_attach(#'v1_0.attach'{snd_settle_mode = SndSettleMode,
rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_SECOND})
when SndSettleMode =/= ?V_1_0_SENDER_SETTLE_MODE_SETTLED ->
exit_not_implemented("rcv-settle-mode second not supported");
validate_attach(#'v1_0.attach'{}) ->
ok.

%% "The handle MUST NOT be used for other open links. An attempt to attach
%% using a handle which is already associated with a link MUST be responded
%% to with an immediate close carrying a handle-in-use session-error." [2.7.3]
validate_handle_not_in_use(Handle, #state{incoming_links = IL,
outgoing_links = OL,
incoming_management_links = IML,
outgoing_management_links = OML})
when is_map_key(Handle, IL) orelse
is_map_key(Handle, OL) orelse
is_map_key(Handle, IML) orelse
is_map_key(Handle, OML) ->
protocol_error(?V_1_0_SESSION_ERROR_HANDLE_IN_USE,
"handle ~b is already associated with a link",
[Handle]);
validate_handle_not_in_use(_, _) ->
ok.

validate_multi_transfer_delivery_id(?UINT(Id), Id) ->
ok;
validate_multi_transfer_delivery_id(undefined, _FirstDeliveryId) ->
Expand Down
35 changes: 34 additions & 1 deletion deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ groups() ->
bad_x_cc_annotation_exchange,
decimal_types,
consumer_timeout_quorum_queue_policy,
consumer_timeout_quorum_queue_consumer_arg
consumer_timeout_quorum_queue_consumer_arg,
handle_in_use
]},

{cluster_size_3, [shuffle],
Expand Down Expand Up @@ -5343,6 +5344,38 @@ attach_to_exclusive_queue(Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).

%% "The handle MUST NOT be used for other open links. An attempt to attach
%% using a handle which is already associated with a link MUST be responded
%% to with an immediate close carrying a handle-in-use session-error." [2.7.3]
handle_in_use(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"my key">>),

Handle = 99,
SenderArgs1 = #{name => <<"sender-1">>,
role => {sender, #{address => Address}},
snd_settle_mode => mixed,
rcv_settle_mode => first,
handle => Handle},
{ok, Sender1} = amqp10_client:attach_link(Session, SenderArgs1),
ok = wait_for_credit(Sender1),

%% Attaching another link with the same handle should close the session.
SenderArgs2 = SenderArgs1#{name := <<"sender-2">>},
{ok, _Sender2} = amqp10_client:attach_link(Session, SenderArgs2),
receive {amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{
condition = ?V_1_0_SESSION_ERROR_HANDLE_IN_USE,
description = {utf8, <<"handle 99 is already associated with a link">>}
}}}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).

dynamic_target_short_link_name(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"my-container">>,
Expand Down
Loading