Skip to content

Commit 1f8a71c

Browse files
committed
STOMP: move everything to binaries
1 parent 2ef84bb commit 1f8a71c

11 files changed

Lines changed: 455 additions & 489 deletions

deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,57 @@
55
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
77

8-
-define(HEADER_ACCEPT_VERSION, "accept-version").
9-
-define(HEADER_ACK, "ack").
10-
-define(HEADER_AMQP_MESSAGE_ID, "amqp-message-id").
11-
-define(HEADER_APP_ID, "app-id").
12-
-define(HEADER_AUTO_DELETE, "auto-delete").
13-
-define(HEADER_CONTENT_ENCODING, "content-encoding").
14-
-define(HEADER_CONTENT_LENGTH, "content-length").
15-
-define(HEADER_CONTENT_TYPE, "content-type").
16-
-define(HEADER_CORRELATION_ID, "correlation-id").
17-
-define(HEADER_DESTINATION, "destination").
18-
-define(HEADER_DURABLE, "durable").
19-
-define(HEADER_EXPIRATION, "expiration").
20-
-define(HEADER_EXCLUSIVE, "exclusive").
21-
-define(HEADER_HEART_BEAT, "heart-beat").
22-
-define(HEADER_HOST, "host").
23-
-define(HEADER_ID, "id").
24-
-define(HEADER_LOGIN, "login").
25-
-define(HEADER_MESSAGE_ID, "message-id").
26-
-define(HEADER_PASSCODE, "passcode").
27-
-define(HEADER_PERSISTENT, "persistent").
28-
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
29-
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
30-
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
31-
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
32-
-define(HEADER_PRIORITY, "priority").
33-
-define(HEADER_X_PRIORITY, "x-priority").
34-
-define(HEADER_RECEIPT, "receipt").
35-
-define(HEADER_REDELIVERED, "redelivered").
36-
-define(HEADER_REPLY_TO, "reply-to").
37-
-define(HEADER_SERVER, "server").
38-
-define(HEADER_SESSION, "session").
39-
-define(HEADER_SUBSCRIPTION, "subscription").
40-
-define(HEADER_TIMESTAMP, "timestamp").
41-
-define(HEADER_TRANSACTION, "transaction").
42-
-define(HEADER_TYPE, "type").
43-
-define(HEADER_USER_ID, "user-id").
44-
-define(HEADER_VERSION, "version").
45-
-define(HEADER_X_DEAD_LETTER_EXCHANGE, "x-dead-letter-exchange").
46-
-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, "x-dead-letter-routing-key").
47-
-define(HEADER_X_EXPIRES, "x-expires").
48-
-define(HEADER_X_MAX_LENGTH, "x-max-length").
49-
-define(HEADER_X_MAX_AGE, "x-max-age").
50-
-define(HEADER_X_MAX_LENGTH_BYTES, "x-max-length-bytes").
51-
-define(HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, "x-stream-max-segment-size-bytes").
52-
-define(HEADER_X_MAX_PRIORITY, "x-max-priority").
53-
-define(HEADER_X_MESSAGE_TTL, "x-message-ttl").
54-
-define(HEADER_X_QUEUE_NAME, "x-queue-name").
55-
-define(HEADER_X_QUEUE_TYPE, "x-queue-type").
56-
-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, "x-stream-filter-size-bytes").
8+
-define(HEADER_ACCEPT_VERSION, <<"accept-version">>).
9+
-define(HEADER_ACK, <<"ack">>).
10+
-define(HEADER_AMQP_MESSAGE_ID, <<"amqp-message-id">>).
11+
-define(HEADER_APP_ID, <<"app-id">>).
12+
-define(HEADER_AUTO_DELETE, <<"auto-delete">>).
13+
-define(HEADER_CONTENT_ENCODING, <<"content-encoding">>).
14+
-define(HEADER_CONTENT_LENGTH, <<"content-length">>).
15+
-define(HEADER_CONTENT_TYPE, <<"content-type">>).
16+
-define(HEADER_CORRELATION_ID, <<"correlation-id">>).
17+
-define(HEADER_DESTINATION, <<"destination">>).
18+
-define(HEADER_DURABLE, <<"durable">>).
19+
-define(HEADER_EXPIRATION, <<"expiration">>).
20+
-define(HEADER_EXCLUSIVE, <<"exclusive">>).
21+
-define(HEADER_HEART_BEAT, <<"heart-beat">>).
22+
-define(HEADER_HOST, <<"host">>).
23+
-define(HEADER_ID, <<"id">>).
24+
-define(HEADER_LOGIN, <<"login">>).
25+
-define(HEADER_MESSAGE_ID, <<"message-id">>).
26+
-define(HEADER_PASSCODE, <<"passcode">>).
27+
-define(HEADER_PERSISTENT, <<"persistent">>).
28+
-define(HEADER_PREFETCH_COUNT, <<"prefetch-count">>).
29+
-define(HEADER_X_STREAM_OFFSET, <<"x-stream-offset">>).
30+
-define(HEADER_X_STREAM_FILTER, <<"x-stream-filter">>).
31+
-define(HEADER_X_STREAM_MATCH_UNFILTERED, <<"x-stream-match-unfiltered">>).
32+
-define(HEADER_PRIORITY, <<"priority">>).
33+
-define(HEADER_X_PRIORITY, <<"x-priority">>).
34+
-define(HEADER_RECEIPT, <<"receipt">>).
35+
-define(HEADER_REDELIVERED, <<"redelivered">>).
36+
-define(HEADER_REPLY_TO, <<"reply-to">>).
37+
-define(HEADER_SERVER, <<"server">>).
38+
-define(HEADER_SESSION, <<"session">>).
39+
-define(HEADER_SUBSCRIPTION, <<"subscription">>).
40+
-define(HEADER_TIMESTAMP, <<"timestamp">>).
41+
-define(HEADER_TRANSACTION, <<"transaction">>).
42+
-define(HEADER_TYPE, <<"type">>).
43+
-define(HEADER_USER_ID, <<"user-id">>).
44+
-define(HEADER_VERSION, <<"version">>).
45+
-define(HEADER_X_DEAD_LETTER_EXCHANGE, <<"x-dead-letter-exchange">>).
46+
-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, <<"x-dead-letter-routing-key">>).
47+
-define(HEADER_X_EXPIRES, <<"x-expires">>).
48+
-define(HEADER_X_MAX_LENGTH, <<"x-max-length">>).
49+
-define(HEADER_X_MAX_AGE, <<"x-max-age">>).
50+
-define(HEADER_X_MAX_LENGTH_BYTES, <<"x-max-length-bytes">>).
51+
-define(HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, <<"x-stream-max-segment-size-bytes">>).
52+
-define(HEADER_X_MAX_PRIORITY, <<"x-max-priority">>).
53+
-define(HEADER_X_MESSAGE_TTL, <<"x-message-ttl">>).
54+
-define(HEADER_X_QUEUE_NAME, <<"x-queue-name">>).
55+
-define(HEADER_X_QUEUE_TYPE, <<"x-queue-type">>).
56+
-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, <<"x-stream-filter-size-bytes">>).
5757

58-
-define(MESSAGE_ID_SEPARATOR, "@@").
58+
-define(MESSAGE_ID_SEPARATOR, <<"@@">>).
5959

6060
-define(HEADERS_NOT_ON_SEND, [?HEADER_MESSAGE_ID]).
6161

@@ -83,14 +83,14 @@
8383
]).
8484

8585

86-
-define(QUEUE_PREFIX, "/queue").
87-
-define(TOPIC_PREFIX, "/topic").
88-
-define(EXCHANGE_PREFIX, "/exchange").
89-
-define(AMQQUEUE_PREFIX, "/amq/queue").
90-
-define(TEMP_QUEUE_PREFIX, "/temp-queue").
91-
%% reply queues names can have slashes in the content so no further
86+
-define(QUEUE_PREFIX, <<"/queue">>).
87+
-define(TOPIC_PREFIX, <<"/topic">>).
88+
-define(EXCHANGE_PREFIX, <<"/exchange">>).
89+
-define(AMQQUEUE_PREFIX, <<"/amq/queue">>).
90+
-define(TEMP_QUEUE_PREFIX, <<"/temp-queue">>).
91+
%% Reply queues names can have slashes in the content so no further
9292
%% parsing happens.
93-
-define(REPLY_QUEUE_PREFIX, "/reply-queue/").
93+
-define(REPLY_QUEUE_PREFIX, <<"/reply-queue/">>).
9494

9595
%%-------------------------------------------------
9696

deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ initial_state(Config) -> {none, Config}.
9696
-record(ps, {acc = [] :: [byte()],
9797
acc_len = 0 :: non_neg_integer(),
9898
cmd :: atom() | binary() | undefined,
99-
hdrs = [] :: [{string(), string()}],
100-
hdrname :: string() | undefined,
99+
hdrs = [] :: [{binary(), binary()}],
100+
hdrname :: binary() | undefined,
101101
config :: #stomp_parser_config{}}).
102102

103103
%%
@@ -173,15 +173,13 @@ parse_hdr(Bin, S = #ps{config = #stomp_parser_config{max_header_length = MaxHL}}
173173
case byte_size(Name) > MaxHL orelse byte_size(Value) > MaxHL of
174174
true -> {error, {max_header_length, MaxHL}};
175175
false ->
176-
Hdrs = insert_header(S#ps.hdrs,
177-
binary_to_list(Name),
178-
binary_to_list(Value)),
176+
Hdrs = insert_header(S#ps.hdrs, Name, Value),
179177
parse_headers(Rest, S#ps{hdrs = Hdrs})
180178
end;
181179
has_escapes ->
182180
parse_hdrname_esc(Bin, S#ps{acc = [], acc_len = 0});
183181
{no_value, Name} ->
184-
{error, {header_no_value, binary_to_list(Name)}};
182+
{error, {header_no_value, Name}};
185183
{more, Len} ->
186184
case Len > MaxHL of
187185
true -> {error, {max_header_length, MaxHL}};
@@ -197,14 +195,14 @@ parse_hdrname_esc(<<>>, S) ->
197195
parse_hdrname_esc(<<?CR>>, S) ->
198196
more(fun(Rest) -> parse_hdrname_esc(<<?CR, Rest/binary>>, S) end);
199197
parse_hdrname_esc(<<?CR, ?LF, _/binary>>, #ps{acc = Acc}) ->
200-
{error, {header_no_value, lists:reverse(Acc)}};
198+
{error, {header_no_value, list_to_binary(lists:reverse(Acc))}};
201199
parse_hdrname_esc(<<?CR, Ch:8, _/binary>>, _) ->
202200
{error, {unexpected_chars_in_header, [?CR, Ch]}};
203201
parse_hdrname_esc(<<?LF, _/binary>>, #ps{acc = Acc}) ->
204-
{error, {header_no_value, lists:reverse(Acc)}};
202+
{error, {header_no_value, list_to_binary(lists:reverse(Acc))}};
205203
parse_hdrname_esc(<<?COLON, Rest/binary>>, S = #ps{acc = Acc}) ->
206204
parse_hdrvalue_esc(Rest, S#ps{acc = [], acc_len = 0,
207-
hdrname = lists:reverse(Acc)});
205+
hdrname = list_to_binary(lists:reverse(Acc))});
208206
parse_hdrname_esc(<<?BSL>>, S) ->
209207
more(fun(Rest) -> parse_hdrname_esc(<<?BSL, Rest/binary>>, S) end);
210208
parse_hdrname_esc(<<?BSL, Ch:8, Rest/binary>>, S) ->
@@ -241,7 +239,7 @@ parse_hdrvalue_esc(<<Ch:8, Rest/binary>>, S = #ps{acc_len = Len,
241239
end.
242240

243241
finish_hdr_esc(Rest, #ps{acc = Acc, hdrs = Hdrs, hdrname = HdrName} = S) ->
244-
Hdrs1 = insert_header(Hdrs, HdrName, lists:reverse(Acc)),
242+
Hdrs1 = insert_header(Hdrs, HdrName, list_to_binary(lists:reverse(Acc))),
245243
parse_headers(Rest, S#ps{hdrs = Hdrs1}).
246244

247245
%%
@@ -427,18 +425,18 @@ header(F, K, D) -> default_value(header(F, K), D).
427425

428426
boolean_header(#stomp_frame{headers = Headers}, Key) ->
429427
case lists:keysearch(Key, 1, Headers) of
430-
{value, {_, "true"}} -> {ok, true};
431-
{value, {_, "false"}} -> {ok, false};
432-
{value, {_, "True"}} -> {ok, true};
433-
{value, {_, "False"}} -> {ok, false};
434-
_ -> not_found
428+
{value, {_, <<"true">>}} -> {ok, true};
429+
{value, {_, <<"false">>}} -> {ok, false};
430+
{value, {_, <<"True">>}} -> {ok, true};
431+
{value, {_, <<"False">>}} -> {ok, false};
432+
_ -> not_found
435433
end.
436434

437435
boolean_header(F, K, D) -> default_value(boolean_header(F, K), D).
438436

439437
internal_integer_header(Headers, Key) ->
440438
case lists:keysearch(Key, 1, Headers) of
441-
{value, {_, Str}} -> {ok, list_to_integer(string:strip(Str))};
439+
{value, {_, Str}} -> {ok, binary_to_integer(string:trim(Str))};
442440
_ -> not_found
443441
end.
444442

@@ -447,11 +445,7 @@ integer_header(#stomp_frame{headers = Headers}, Key) ->
447445

448446
integer_header(F, K, D) -> default_value(integer_header(F, K), D).
449447

450-
binary_header(F, K) ->
451-
case header(F, K) of
452-
{ok, Str} -> {ok, list_to_binary(Str)};
453-
not_found -> not_found
454-
end.
448+
binary_header(F, K) -> header(F, K).
455449

456450
binary_header(F, K, D) -> default_value(binary_header(F, K), D).
457451

@@ -495,7 +489,7 @@ serialize(#stomp_frame{command = Command,
495489
lists:map(fun serialize_header/1,
496490
lists:keydelete(?HEADER_CONTENT_LENGTH, 1, Headers)),
497491
if
498-
Len > 0 -> [?HEADER_CONTENT_LENGTH ++ ":", integer_to_list(Len), ?LF];
492+
Len > 0 -> [?HEADER_CONTENT_LENGTH, ?COLON, integer_to_list(Len), ?LF];
499493
true -> []
500494
end,
501495
?LF, case BodyFragments of
@@ -509,17 +503,18 @@ serialize_command(Command) -> Command.
509503

510504
serialize_header({K, V}) when is_integer(V) -> hdr(escape(K), integer_to_list(V));
511505
serialize_header({K, V}) when is_boolean(V) -> hdr(escape(K), boolean_to_list(V));
512-
serialize_header({K, V}) when is_list(V) -> hdr(escape(K), escape(V)).
506+
serialize_header({K, V}) when is_binary(V) -> hdr(escape(K), escape(V)).
513507

514508
boolean_to_list(true) -> "true";
515509
boolean_to_list(_) -> "false".
516510

517511
hdr(K, V) -> [K, ?COLON, V, ?LF].
518512

519-
escape(Str) -> [escape1(Ch) || Ch <- Str].
513+
escape(Bin) -> escape(Bin, []).
520514

521-
escape1(?COLON) -> [?BSL, ?COLON_ESC];
522-
escape1(?BSL) -> [?BSL, ?BSL_ESC];
523-
escape1(?LF) -> [?BSL, ?LF_ESC];
524-
escape1(?CR) -> [?BSL, ?CR_ESC];
525-
escape1(Ch) -> Ch.
515+
escape(<<>>, Acc) -> lists:reverse(Acc);
516+
escape(<<?COLON, Rest/binary>>, Acc) -> escape(Rest, [?COLON_ESC, ?BSL | Acc]);
517+
escape(<<?BSL, Rest/binary>>, Acc) -> escape(Rest, [?BSL_ESC, ?BSL | Acc]);
518+
escape(<<?LF, Rest/binary>>, Acc) -> escape(Rest, [?LF_ESC, ?BSL | Acc]);
519+
escape(<<?CR, Rest/binary>>, Acc) -> escape(Rest, [?CR_ESC, ?BSL | Acc]);
520+
escape(<<Ch:8, Rest/binary>>, Acc) -> escape(Rest, [Ch | Acc]).

0 commit comments

Comments
 (0)