Skip to content

Commit ab75e63

Browse files
michaelklishinmergify[bot]
authored andcommitted
Make sure STOMP correctly resolves virtual host-level DQT
via `rabbit_vhost:default_queue_type/1`. (cherry picked from commit a27c5ae)
1 parent a5888bf commit ab75e63

1 file changed

Lines changed: 12 additions & 10 deletions

File tree

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
%% see rabbitmq/rabbitmq-stomp#39
3232
trailing_lf, auth_mechanism, auth_login,
3333
default_topic_exchange, default_nack_requeue,
34-
default_queue_type}).
34+
virtual_host}).
3535

3636
-record(subscription, {dest_hdr, ack_mode, multi_ack, description}).
3737

@@ -607,11 +607,11 @@ do_login(Username, Passwd, VirtualHost, Heartbeat, AdapterInfo, Version,
607607
false -> [{?HEADER_SERVER, server_header()} | Headers]
608608
end,
609609
"",
610-
State#proc_state{session_id = SessionId,
611-
channel = Channel,
612-
connection = Connection,
613-
version = Version,
614-
default_queue_type = rabbit_vhost:default_queue_type(VirtualHost)});
610+
State#proc_state{session_id = SessionId,
611+
channel = Channel,
612+
connection = Connection,
613+
version = Version,
614+
virtual_host = VirtualHost});
615615
{error, {auth_failure, _}} ->
616616
?LOG_WARNING("STOMP login failed for user '~ts': authentication failed", [Username]),
617617
error("Bad CONNECT", "Access refused for user '" ++
@@ -1131,7 +1131,7 @@ ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) ->
11311131
{error, {invalid_destination, "Destination cannot be blank"}};
11321132

11331133
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel,
1134-
State = #proc_state{default_queue_type = DQT}) ->
1134+
State = #proc_state{virtual_host = VHost}) ->
11351135
Params =
11361136
[{subscription_queue_name_gen,
11371137
fun () ->
@@ -1142,14 +1142,16 @@ ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel,
11421142
list_to_binary(rabbit_stomp_util:subscription_queue_name(Name, Id, Frame))
11431143
end
11441144
},
1145-
{default_queue_type, DQT}] ++ rabbit_stomp_util:build_params(EndPoint, Headers),
1145+
{default_queue_type, rabbit_vhost:default_queue_type(VHost)}]
1146+
++ rabbit_stomp_util:build_params(EndPoint, Headers),
11461147
Arguments = rabbit_stomp_util:build_arguments(Headers),
11471148
rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint,
11481149
[Arguments | Params], State);
11491150

11501151
ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel,
1151-
State = #proc_state{default_queue_type = DQT}) ->
1152-
Params = [{default_queue_type, DQT} | rabbit_stomp_util:build_params(EndPoint, Headers)],
1152+
State = #proc_state{virtual_host = VHost}) ->
1153+
Params = [{default_queue_type, rabbit_vhost:default_queue_type(VHost)}
1154+
| rabbit_stomp_util:build_params(EndPoint, Headers)],
11531155
Arguments = rabbit_stomp_util:build_arguments(Headers),
11541156
rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
11551157
[Arguments | Params], State).

0 commit comments

Comments
 (0)