Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_confirms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
-opaque state() :: #?MODULE{}.

-export_type([
state/0
state/0,
mx/0
]).

-spec init() -> state().
Expand Down
17 changes: 10 additions & 7 deletions deps/rabbitmq_cli/test/plugins/disable_plugins_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -258,23 +258,26 @@ defmodule DisablePluginsCommandTest do
test "disabling a dependency disables all plugins that depend on it", context do
assert {:stream, test_stream} = @command.run(["amqp_client"], context[:opts])
result = Enum.to_list(test_stream)
expected_list = [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
expected_disabled = [:rabbitmq_exchange_federation, :rabbitmq_federation,
:rabbitmq_federation_common, :rabbitmq_queue_federation]
expected = [
[],
[:rabbitmq_stomp],
%{
mode: :online,
started: [],
stopped: expected_list,
disabled: expected_list,
set: []
stopped: expected_disabled,
disabled: expected_disabled,
set: [:rabbitmq_stomp]
}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)

assert {:ok, [[]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert {:ok, [[:rabbitmq_stomp]]} == :file.consult(context[:opts][:enabled_plugins_file])

# Before native STOMP, this would be empty because STOMP depended on
# amqp_client. Native STOMP does not depend on amqp_client.
result = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
assert Enum.empty?(result)
assert Enum.sort(result) == [:rabbitmq_stomp]
end

test "formats enabled plugins mismatch errors", context do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ defmodule EnablePluginsCommandTest do
Enum.to_list(test_stream0)

check_plugins_enabled([:rabbitmq_stomp], context)
assert_equal_sets([:amqp_client, :rabbitmq_stomp], currently_active_plugins(context))
# Native STOMP does not depend on amqp_client
assert_equal_sets([:rabbitmq_stomp], currently_active_plugins(context))

{:stream, test_stream1} = @command.run(["rabbitmq_federation"], context[:opts])

Expand Down
3 changes: 2 additions & 1 deletion deps/rabbitmq_cli/test/plugins/set_plugins_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ defmodule SetPluginsCommandTest do

assert {:ok, [[:rabbitmq_stomp]]} = :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_stomp] =
# Native STOMP does not depend on amqp_client
assert [:rabbitmq_stomp] =
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))

assert {:stream, test_stream1} = @command.run(["rabbitmq_federation"], context[:opts])
Expand Down
67 changes: 33 additions & 34 deletions deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ amqp_mqtt(Qos, Config) ->

mqtt_stomp_mqtt(Config) ->
{ok, StompC0} = stomp_connect(Config),
ok = stomp_send(StompC0, "SUBSCRIBE", [{"destination", "/topic/t.1"},
{"receipt", "my-receipt"},
{"id", "subscription-888"},
{"durable", "true"}]),
{#stomp_frame{command = "RECEIPT",
headers = [{"receipt-id","my-receipt"}]}, StompC1} = stomp_recv(StompC0),
ok = stomp_send(StompC0, 'SUBSCRIBE', [{<<"destination">>, <<"/topic/t.1">>},
{<<"receipt">>, <<"my-receipt">>},
{<<"id">>, <<"subscription-888">>},
{<<"durable">>, <<"true">>}]),
{#stomp_frame{command = 'RECEIPT',
headers = #{<<"receipt-id">> := <<"my-receipt">>}}, StompC1} = stomp_recv(StompC0),

%% MQTT 5.0 to STOMP 1.2
C = connect(<<"my-mqtt-client">>, Config),
Expand All @@ -513,40 +513,39 @@ mqtt_stomp_mqtt(Config) ->
'User-Property' => UserProperty},
RequestPayload, [{qos, 1}]),

{#stomp_frame{command = "MESSAGE",
headers = Headers0,
body_iolist = Body} = Msg1, StompC2} = stomp_recv(StompC1),
{#stomp_frame{command = 'MESSAGE',
headers = Headers,
body_iolist_rev = BodyRev} = Msg1, StompC2} = stomp_recv(StompC1),
Body = lists:reverse(BodyRev),
?assertEqual(RequestPayload, iolist_to_binary(Body)),
Headers1 = maps:from_list(Headers0),
Headers = maps:map(fun(_K, V) -> unicode:characters_to_binary(V) end, Headers1),
ct:pal("Received STOMP 1.2 message:~n~p~n"
"with headers map:~n~p", [Msg1, Headers]),
?assertMatch(
#{"content-type" := ContentType,
"correlation-id" := Correlation,
"destination" := <<"/topic/t.1">>,
#{<<"content-type">> := ContentType,
<<"correlation-id">> := Correlation,
<<"destination">> := <<"/topic/t.1">>,
%% With Native STOMP, this should be translated to
%% reply-to: /topic/response.topic
"x-reply-to-topic" := <<"response.topic">>,
"subscription" := <<"subscription-888">>,
"persistent" := <<"true">>,
<<"x-reply-to-topic">> := <<"response.topic">>,
<<"subscription">> := <<"subscription-888">>,
<<"persistent">> := <<"true">>,
%% The STOMP spec mandates headers to be encoded as UTF-8, but unfortunately the RabbitMQ
%% STOMP implementation (as of 3.13) does not adhere and therefore does not provide UTF-8 support.
% "rabbit🐇" := <<"carrot🥕"/utf8>>,
% "x-rabbit🐇" := <<"carrot🥕"/utf8>>,
"key" := <<"val1">>,
"x-key" := <<"val1">>
% <<"rabbit🐇"/utf8>> := <<"carrot🥕"/utf8>>,
% <<"x-rabbit🐇"/utf8>> := <<"carrot🥕"/utf8>>,
<<"key">> := <<"val1">>,
<<"x-key">> := <<"val1">>
},
Headers),

%% STOMP 1.2 to MQTT 5.0
ok = stomp_send(StompC2, "SEND",
[{"destination", "/topic/response.topic"},
{"persistent", "true"},
{"content-type", "application/json"},
{"correlation-id", binary_to_list(Correlation)},
{"x-key", "val4"}],
["{\"my\" : \"response\"}"]),
ok = stomp_send(StompC2, 'SEND',
[{<<"destination">>, <<"/topic/response.topic">>},
{<<"persistent">>, <<"true">>},
{<<"content-type">>, <<"application/json">>},
{<<"correlation-id">>, Correlation},
{<<"x-key">>, <<"val4">>}],
[<<"{\"my\" : \"response\"}">>]),
ok = stomp_disconnect(StompC2),

receive {publish, MqttMsg} ->
Expand Down Expand Up @@ -713,12 +712,12 @@ stomp_connect(Config) ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
{ok, Sock} = gen_tcp:connect(localhost, Port, [{active, false}, binary]),
Client0 = {Sock, []},
stomp_send(Client0, "CONNECT", [{"accept-version", "1.2"}]),
{#stomp_frame{command = "CONNECTED"}, Client1} = stomp_recv(Client0),
stomp_send(Client0, 'CONNECT', [{<<"accept-version">>, <<"1.2">>}]),
{#stomp_frame{command = 'CONNECTED'}, Client1} = stomp_recv(Client0),
{ok, Client1}.

stomp_disconnect(Client = {Sock, _}) ->
stomp_send(Client, "DISCONNECT"),
stomp_send(Client, 'DISCONNECT'),
gen_tcp:close(Sock).

stomp_send(Client, Command) ->
Expand All @@ -729,9 +728,9 @@ stomp_send(Client, Command, Headers) ->

stomp_send({Sock, _}, Command, Headers, Body) ->
Frame = rabbit_stomp_frame:serialize(
#stomp_frame{command = list_to_binary(Command),
headers = Headers,
body_iolist = Body}),
#stomp_frame{command = Command,
headers = maps:from_list(Headers),
body_iolist_rev = Body}),
gen_tcp:send(Sock, Frame).

stomp_recv({_Sock, []} = Client) ->
Expand Down
10 changes: 8 additions & 2 deletions deps/rabbitmq_stomp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ define PROJECT_ENV
{passcode, <<"guest">>}]},
{default_vhost, <<"/">>},
{default_topic_exchange, <<"amq.topic">>},
{default_nack_requeue, true},
{default_nack_requeue, true},
{ssl_cert_login, false},
{implicit_connect, false},
{tcp_listeners, [61613]},
Expand All @@ -30,7 +30,7 @@ define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, []}
endef

DEPS = ranch rabbit_common rabbit amqp_client
DEPS = ranch rabbit_common rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management proper

PLT_APPS += rabbitmq_cli elixir ssl
Expand All @@ -40,3 +40,9 @@ DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

include ../../rabbitmq-components.mk
include ../../erlang.mk

# Regenerate per-function CT test cases from Python test files.
# The generated .hrl is committed; this rule updates it when Python sources change.
# Runs only when python3 is available.
test/python_SUITE_generated.hrl:: $(wildcard test/python_SUITE_data/src/*.py) test/generate_python_tests.py
$(if $(shell which python3 2>/dev/null),python3 test/generate_python_tests.py test/python_SUITE_data/src $@,@echo "python3 not found, using committed $@")
31 changes: 29 additions & 2 deletions deps/rabbitmq_stomp/include/rabbit_stomp.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,27 @@
default_passcode,
force_default_creds = false,
implicit_connect,
ssl_cert_login}).
ssl_cert_login,
max_header_length,
max_headers,
max_body_length}).


-define(SUPPORTED_VERSIONS, ["1.0", "1.1", "1.2"]).

-define(STOMP_PROTO_V1_0, 'STOMP 1.0').
-define(STOMP_PROTO_V1_1, 'STOMP 1.1').
-define(STOMP_PROTO_V1_2, 'STOMP 1.2').



-define(INFO_ITEMS,
[conn_name,
name,
user,
connection,
connection_state,
session_id,
channel,
version,
implicit_connect,
auth_login,
Expand All @@ -29,6 +40,7 @@
peer_host,
peer_port,
protocol,
connected_at,
channels,
channel_max,
frame_max,
Expand All @@ -43,3 +55,18 @@

-define(DEFAULT_MAX_FRAME_SIZE, 4 * 1024 * 1024).
-define(DEFAULT_MAX_FRAME_SIZE_UNAUTHENTICATED, 65536).

-define(SIMPLE_METRICS,
[pid,
recv_oct,
send_oct,
reductions]).
-define(OTHER_METRICS,
[recv_cnt,
send_cnt,
send_pend,
garbage_collection,
state,
timeout]).

-type send_fun() :: fun ((iodata()) -> ok).
7 changes: 6 additions & 1 deletion deps/rabbitmq_stomp/include/rabbit_stomp_frame.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-record(stomp_frame, {command, headers, body_iolist}).
-record(stomp_frame, {command, headers, body_iolist_rev}).

-record(stomp_parser_config, {max_header_length = 1024*100,
max_headers = 100,
max_body_length = 1024*1024*100}).
-define(DEFAULT_STOMP_PARSER_CONFIG, #stomp_parser_config{}).
110 changes: 60 additions & 50 deletions deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,59 @@
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-define(HEADER_ACCEPT_VERSION, "accept-version").
-define(HEADER_ACK, "ack").
-define(HEADER_AMQP_MESSAGE_ID, "amqp-message-id").
-define(HEADER_APP_ID, "app-id").
-define(HEADER_AUTO_DELETE, "auto-delete").
-define(HEADER_CONTENT_ENCODING, "content-encoding").
-define(HEADER_CONTENT_LENGTH, "content-length").
-define(HEADER_CONTENT_TYPE, "content-type").
-define(HEADER_CORRELATION_ID, "correlation-id").
-define(HEADER_DESTINATION, "destination").
-define(HEADER_DURABLE, "durable").
-define(HEADER_EXPIRATION, "expiration").
-define(HEADER_EXCLUSIVE, "exclusive").
-define(HEADER_HEART_BEAT, "heart-beat").
-define(HEADER_HOST, "host").
-define(HEADER_ID, "id").
-define(HEADER_LOGIN, "login").
-define(HEADER_MESSAGE_ID, "message-id").
-define(HEADER_PASSCODE, "passcode").
-define(HEADER_PERSISTENT, "persistent").
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
-define(HEADER_PRIORITY, "priority").
-define(HEADER_X_PRIORITY, "x-priority").
-define(HEADER_RECEIPT, "receipt").
-define(HEADER_REDELIVERED, "redelivered").
-define(HEADER_REPLY_TO, "reply-to").
-define(HEADER_SERVER, "server").
-define(HEADER_SESSION, "session").
-define(HEADER_SUBSCRIPTION, "subscription").
-define(HEADER_TIMESTAMP, "timestamp").
-define(HEADER_TRANSACTION, "transaction").
-define(HEADER_TYPE, "type").
-define(HEADER_USER_ID, "user-id").
-define(HEADER_VERSION, "version").
-define(HEADER_X_DEAD_LETTER_EXCHANGE, "x-dead-letter-exchange").
-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, "x-dead-letter-routing-key").
-define(HEADER_X_EXPIRES, "x-expires").
-define(HEADER_X_MAX_LENGTH, "x-max-length").
-define(HEADER_X_MAX_AGE, "x-max-age").
-define(HEADER_X_MAX_LENGTH_BYTES, "x-max-length-bytes").
-define(HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, "x-stream-max-segment-size-bytes").
-define(HEADER_X_MAX_PRIORITY, "x-max-priority").
-define(HEADER_X_MESSAGE_TTL, "x-message-ttl").
-define(HEADER_X_QUEUE_NAME, "x-queue-name").
-define(HEADER_X_QUEUE_TYPE, "x-queue-type").
-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, "x-stream-filter-size-bytes").
-include("rabbit_stomp_routing_prefixes.hrl").

-define(MESSAGE_ID_SEPARATOR, "@@").
-define(HEADER_ACCEPT_VERSION, <<"accept-version">>).
-define(HEADER_ACK, <<"ack">>).
-define(HEADER_AMQP_MESSAGE_ID, <<"amqp-message-id">>).
-define(HEADER_APP_ID, <<"app-id">>).
-define(HEADER_AUTO_DELETE, <<"auto-delete">>).
-define(HEADER_CONTENT_ENCODING, <<"content-encoding">>).
-define(HEADER_CONTENT_LENGTH, <<"content-length">>).
-define(HEADER_CONTENT_TYPE, <<"content-type">>).
-define(HEADER_CORRELATION_ID, <<"correlation-id">>).
-define(HEADER_DESTINATION, <<"destination">>).
-define(HEADER_DURABLE, <<"durable">>).
-define(HEADER_EXPIRATION, <<"expiration">>).
-define(HEADER_EXCLUSIVE, <<"exclusive">>).
-define(HEADER_HEART_BEAT, <<"heart-beat">>).
-define(HEADER_HOST, <<"host">>).
-define(HEADER_ID, <<"id">>).
-define(HEADER_LOGIN, <<"login">>).
-define(HEADER_MESSAGE_ID, <<"message-id">>).
-define(HEADER_PASSCODE, <<"passcode">>).
-define(HEADER_PERSISTENT, <<"persistent">>).
-define(HEADER_PREFETCH_COUNT, <<"prefetch-count">>).
-define(HEADER_X_STREAM_OFFSET, <<"x-stream-offset">>).
-define(HEADER_X_STREAM_FILTER, <<"x-stream-filter">>).
-define(HEADER_X_STREAM_MATCH_UNFILTERED, <<"x-stream-match-unfiltered">>).
-define(HEADER_PRIORITY, <<"priority">>).
-define(HEADER_X_PRIORITY, <<"x-priority">>).
-define(HEADER_RECEIPT, <<"receipt">>).
-define(HEADER_REDELIVERED, <<"redelivered">>).
-define(HEADER_REPLY_TO, <<"reply-to">>).
-define(HEADER_SERVER, <<"server">>).
-define(HEADER_SESSION, <<"session">>).
-define(HEADER_SUBSCRIPTION, <<"subscription">>).
-define(HEADER_TIMESTAMP, <<"timestamp">>).
-define(HEADER_TRANSACTION, <<"transaction">>).
-define(HEADER_TYPE, <<"type">>).
-define(HEADER_USER_ID, <<"user-id">>).
-define(HEADER_VERSION, <<"version">>).
-define(HEADER_X_DEAD_LETTER_EXCHANGE, <<"x-dead-letter-exchange">>).
-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, <<"x-dead-letter-routing-key">>).
-define(HEADER_X_EXPIRES, <<"x-expires">>).
-define(HEADER_X_MAX_LENGTH, <<"x-max-length">>).
-define(HEADER_X_MAX_AGE, <<"x-max-age">>).
-define(HEADER_X_MAX_LENGTH_BYTES, <<"x-max-length-bytes">>).
-define(HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, <<"x-stream-max-segment-size-bytes">>).
-define(HEADER_X_MAX_PRIORITY, <<"x-max-priority">>).
-define(HEADER_X_MESSAGE_TTL, <<"x-message-ttl">>).
-define(HEADER_X_QUEUE_NAME, <<"x-queue-name">>).
-define(HEADER_X_QUEUE_TYPE, <<"x-queue-type">>).
-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, <<"x-stream-filter-size-bytes">>).

-define(MESSAGE_ID_SEPARATOR, <<"@@">>).

-define(HEADERS_NOT_ON_SEND, [?HEADER_MESSAGE_ID]).

Expand All @@ -81,3 +83,11 @@
?HEADER_EXCLUSIVE,
?HEADER_PERSISTENT
]).


%%-------------------------------------------------

-define(DEST_PREFIXES, [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX,
?AMQQUEUE_PREFIX, ?REPLY_QUEUE_PREFIX]).

-define(ALL_DEST_PREFIXES, [?TEMP_QUEUE_PREFIX | ?DEST_PREFIXES]).
Loading
Loading