Skip to content

Commit 2ef84bb

Browse files
committed
STOMP: do zero-copy frame parsing if no escapes involved
1 parent 3414abe commit 2ef84bb

1 file changed

Lines changed: 205 additions & 81 deletions

File tree

deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl

Lines changed: 205 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ initial_state(Config) -> {none, Config}.
9090
%% Allow some headroom for unknown commands but bound memory usage.
9191
-define(MAX_COMMAND_LENGTH, 32).
9292

93-
%% Parser state
93+
%% Parser state.
94+
%% acc is only used for header values with escape sequences.
95+
%% Commands and headers without escapes use sub-binary extraction.
9496
-record(ps, {acc = [] :: [byte()],
9597
acc_len = 0 :: non_neg_integer(),
9698
cmd :: atom() | binary() | undefined,
@@ -103,110 +105,232 @@ initial_state(Config) -> {none, Config}.
103105
%%
104106

105107
parse(Content, {resume, Continuation}) -> Continuation(Content);
106-
parse(Content, {none, Config}) -> parser(Content, noise, #ps{config = Config}).
108+
parse(Content, {none, Config}) -> parse_noise(Content, #ps{config = Config}).
107109

108110
%%
109-
%% Incremental state machine parser
111+
%% Phase: noise — skip NULs and LFs between frames
110112
%%
111-
%% Phases: noise | command | headers | hdrname | hdrvalue
112-
%% Body parsing is handled separately by parse_body/2.
113+
114+
parse_noise(<<>>, S) ->
115+
more(fun(Rest) -> parse_noise(Rest, S) end);
116+
parse_noise(<<?NUL, Rest/binary>>, S) -> parse_noise(Rest, S);
117+
parse_noise(<<?LF, Rest/binary>>, S) -> parse_noise(Rest, S);
118+
parse_noise(<<?CR, ?LF, Rest/binary>>, S) -> parse_noise(Rest, S);
119+
parse_noise(<<?CR>>, S) -> more(fun(Rest) -> parse_noise(<<?CR, Rest/binary>>, S) end);
120+
parse_noise(<<?CR, Ch:8, _/binary>>, _S) -> {error, {unexpected_chars_between_frames, [?CR, Ch]}};
121+
parse_noise(Bin, S) -> parse_command(Bin, S).
122+
123+
%%
124+
%% Phase: command — scan for LF, extract as sub-binary
113125
%%
114126

115-
more(Continuation) -> {more, {resume, Continuation}}.
127+
parse_command(Bin, S) ->
128+
case scan_until_lf(Bin) of
129+
{ok, CmdBin, Rest} ->
130+
case byte_size(CmdBin) > ?MAX_COMMAND_LENGTH of
131+
true -> {error, {command_too_long, ?MAX_COMMAND_LENGTH}};
132+
false ->
133+
Cmd = maps:get(CmdBin, ?KNOWN_COMMANDS, CmdBin),
134+
parse_headers(Rest, S#ps{cmd = Cmd, hdrs = []})
135+
end;
136+
{more, Len} ->
137+
case Len > ?MAX_COMMAND_LENGTH of
138+
true -> {error, {command_too_long, ?MAX_COMMAND_LENGTH}};
139+
false -> more(fun(Rest) -> parse_command(<<Bin/binary, Rest/binary>>, S) end)
140+
end;
141+
{error, _} = Err ->
142+
Err
143+
end.
144+
145+
%%
146+
%% Phase: headers — dispatch to hdrname or body
147+
%%
148+
149+
parse_headers(<<?LF, Rest/binary>>, S) ->
150+
parse_body(Rest, S);
151+
parse_headers(<<?CR, ?LF, Rest/binary>>, S) ->
152+
parse_body(Rest, S);
153+
parse_headers(<<>>, S) ->
154+
more(fun(Rest) -> parse_headers(Rest, S) end);
155+
parse_headers(<<?CR>>, S) ->
156+
more(fun(Rest) -> parse_headers(<<?CR, Rest/binary>>, S) end);
157+
parse_headers(Bin, S = #ps{hdrs = Headers,
158+
config = #stomp_parser_config{max_headers = MaxHeaders}}) ->
159+
case length(Headers) >= MaxHeaders of
160+
true -> {error, {max_headers, MaxHeaders}};
161+
false -> parse_hdr(Bin, S)
162+
end.
163+
164+
%%
165+
%% Phase: header line — scan for COLON and LF in bulk.
166+
%% Fast path: no escapes or CR in the header line.
167+
%% Slow path: escape sequences present, fall back to byte-by-byte.
168+
%%
169+
170+
parse_hdr(Bin, S = #ps{config = #stomp_parser_config{max_header_length = MaxHL}}) ->
171+
case scan_header_line(Bin) of
172+
{ok, Name, Value, Rest} ->
173+
case byte_size(Name) > MaxHL orelse byte_size(Value) > MaxHL of
174+
true -> {error, {max_header_length, MaxHL}};
175+
false ->
176+
Hdrs = insert_header(S#ps.hdrs,
177+
binary_to_list(Name),
178+
binary_to_list(Value)),
179+
parse_headers(Rest, S#ps{hdrs = Hdrs})
180+
end;
181+
has_escapes ->
182+
parse_hdrname_esc(Bin, S#ps{acc = [], acc_len = 0});
183+
{no_value, Name} ->
184+
{error, {header_no_value, binary_to_list(Name)}};
185+
{more, Len} ->
186+
case Len > MaxHL of
187+
true -> {error, {max_header_length, MaxHL}};
188+
false -> more(fun(Rest) -> parse_hdr(<<Bin/binary, Rest/binary>>, S) end)
189+
end;
190+
{error, _} = Err ->
191+
Err
192+
end.
116193

117-
%% --- Need more data ---
118-
parser(<<>>, Phase, S) -> more(fun(Rest) -> parser(Rest, Phase, S) end);
119-
parser(<<?CR>>, Phase, S) -> more(fun(Rest) -> parser(<<?CR, Rest/binary>>, Phase, S) end);
120-
121-
%% --- CR LF normalization ---
122-
parser(<<?CR, ?LF, Rest/binary>>, Phase, S) -> parser(<<?LF, Rest/binary>>, Phase, S);
123-
parser(<<?CR, Ch:8, _Rest/binary>>, Phase, _S) -> {error, {unexpected_chars(Phase), [?CR, Ch]}};
124-
125-
%% --- Escape processing (header names and values only) ---
126-
parser(<<?BSL>>, Phase, S)
127-
when Phase =:= hdrname;
128-
Phase =:= hdrvalue -> more(fun(Rest) -> parser(<<?BSL, Rest/binary>>, Phase, S) end);
129-
parser(<<?BSL, Ch:8, Rest/binary>>, Phase, S)
130-
when Phase =:= hdrname;
131-
Phase =:= hdrvalue -> unescape(Ch, fun(Ech) -> parser(Rest, Phase, accum(Ech, S)) end);
132-
133-
%% --- Noise: skip NULs and LFs between frames ---
134-
parser(<<?NUL, Rest/binary>>, noise, S) -> parser(Rest, noise, S);
135-
parser(<<?LF, Rest/binary>>, noise, S) -> parser(Rest, noise, S);
136-
parser(Rest, noise, S) -> parser(Rest, command, S#ps{acc = [], acc_len = 0});
137-
138-
%% --- Command: accumulate bytes until LF ---
139-
parser(<<?LF, Rest/binary>>, command, S) -> goto(command, headers, Rest, S);
140-
parser(<<Ch:8, Rest/binary>>, command, S = #ps{acc_len = Len}) ->
141-
case Len >= ?MAX_COMMAND_LENGTH of
142-
true -> {error, {command_too_long, ?MAX_COMMAND_LENGTH}};
143-
false -> parser(Rest, command, accum(Ch, S))
144-
end;
145-
146-
%% --- Headers: LF means end of headers (start body), otherwise start a header name ---
147-
parser(<<?LF, Rest/binary>>, headers, S) -> goto(headers, body, Rest, S);
148-
parser(Rest, headers, S) -> goto(headers, hdrname, Rest, S);
149-
150-
%% --- Header name: accumulate until COLON or LF ---
151-
parser(<<?COLON, Rest/binary>>, hdrname, S) -> goto(hdrname, hdrvalue, Rest, S);
152-
parser(<<?LF, _Rest/binary>>, hdrname, #ps{acc = Acc}) ->
194+
%% Slow path for header names with escapes or CR
195+
parse_hdrname_esc(<<>>, S) ->
196+
more(fun(Rest) -> parse_hdrname_esc(Rest, S) end);
197+
parse_hdrname_esc(<<?CR>>, S) ->
198+
more(fun(Rest) -> parse_hdrname_esc(<<?CR, Rest/binary>>, S) end);
199+
parse_hdrname_esc(<<?CR, ?LF, _/binary>>, #ps{acc = Acc}) ->
153200
{error, {header_no_value, lists:reverse(Acc)}};
154-
parser(<<Ch:8, Rest/binary>>, hdrname, S = #ps{acc_len = Len,
155-
config = #stomp_parser_config{
156-
max_header_length = Max}}) ->
201+
parse_hdrname_esc(<<?CR, Ch:8, _/binary>>, _) ->
202+
{error, {unexpected_chars_in_header, [?CR, Ch]}};
203+
parse_hdrname_esc(<<?LF, _/binary>>, #ps{acc = Acc}) ->
204+
{error, {header_no_value, lists:reverse(Acc)}};
205+
parse_hdrname_esc(<<?COLON, Rest/binary>>, S = #ps{acc = Acc}) ->
206+
parse_hdrvalue_esc(Rest, S#ps{acc = [], acc_len = 0,
207+
hdrname = lists:reverse(Acc)});
208+
parse_hdrname_esc(<<?BSL>>, S) ->
209+
more(fun(Rest) -> parse_hdrname_esc(<<?BSL, Rest/binary>>, S) end);
210+
parse_hdrname_esc(<<?BSL, Ch:8, Rest/binary>>, S) ->
211+
unescape(Ch, fun(Ech) -> parse_hdrname_esc(Rest, accum(Ech, S)) end);
212+
parse_hdrname_esc(<<Ch:8, Rest/binary>>, S = #ps{acc_len = Len,
213+
config = #stomp_parser_config{
214+
max_header_length = Max}}) ->
157215
case Len >= Max of
158216
true -> {error, {max_header_length, Max}};
159-
false -> parser(Rest, hdrname, accum(Ch, S))
160-
end;
161-
162-
%% --- Header value: accumulate until LF ---
163-
parser(<<?LF, Rest/binary>>, hdrvalue, S) -> goto(hdrvalue, headers, Rest, S);
164-
parser(<<Ch:8, Rest/binary>>, hdrvalue, S = #ps{acc_len = Len,
165-
config = #stomp_parser_config{
166-
max_header_length = Max}}) ->
217+
false -> parse_hdrname_esc(Rest, accum(Ch, S))
218+
end.
219+
220+
%% Slow path for header values with escapes
221+
parse_hdrvalue_esc(<<>>, S) ->
222+
more(fun(Rest) -> parse_hdrvalue_esc(Rest, S) end);
223+
parse_hdrvalue_esc(<<?CR>>, S) ->
224+
more(fun(Rest) -> parse_hdrvalue_esc(<<?CR, Rest/binary>>, S) end);
225+
parse_hdrvalue_esc(<<?CR, ?LF, Rest/binary>>, S) ->
226+
finish_hdr_esc(Rest, S);
227+
parse_hdrvalue_esc(<<?CR, Ch:8, _/binary>>, _) ->
228+
{error, {unexpected_chars_in_header, [?CR, Ch]}};
229+
parse_hdrvalue_esc(<<?LF, Rest/binary>>, S) ->
230+
finish_hdr_esc(Rest, S);
231+
parse_hdrvalue_esc(<<?BSL>>, S) ->
232+
more(fun(Rest) -> parse_hdrvalue_esc(<<?BSL, Rest/binary>>, S) end);
233+
parse_hdrvalue_esc(<<?BSL, Ch:8, Rest/binary>>, S) ->
234+
unescape(Ch, fun(Ech) -> parse_hdrvalue_esc(Rest, accum(Ech, S)) end);
235+
parse_hdrvalue_esc(<<Ch:8, Rest/binary>>, S = #ps{acc_len = Len,
236+
config = #stomp_parser_config{
237+
max_header_length = Max}}) ->
167238
case Len >= Max of
168239
true -> {error, {max_header_length, Max}};
169-
false -> parser(Rest, hdrvalue, accum(Ch, S))
240+
false -> parse_hdrvalue_esc(Rest, accum(Ch, S))
170241
end.
171242

243+
finish_hdr_esc(Rest, #ps{acc = Acc, hdrs = Hdrs, hdrname = HdrName} = S) ->
244+
Hdrs1 = insert_header(Hdrs, HdrName, lists:reverse(Acc)),
245+
parse_headers(Rest, S#ps{hdrs = Hdrs1}).
246+
172247
%%
173-
%% State transitions
248+
%% Binary scanning helpers — bulk operations, no per-byte allocation
174249
%%
175250

176-
goto(command, headers, Rest, S = #ps{acc = Acc}) ->
177-
CmdBin = list_to_binary(lists:reverse(Acc)),
178-
Cmd = maps:get(CmdBin, ?KNOWN_COMMANDS, CmdBin),
179-
parser(Rest, headers, S#ps{cmd = Cmd, hdrs = []});
180-
181-
goto(headers, body, Rest, S) ->
182-
parse_body(Rest, S);
251+
more(Continuation) -> {more, {resume, Continuation}}.
183252

184-
goto(headers, hdrname, Rest, S = #ps{hdrs = Headers,
185-
config = #stomp_parser_config{
186-
max_headers = MaxHeaders}}) ->
187-
case length(Headers) >= MaxHeaders of
188-
true -> {error, {max_headers, MaxHeaders}};
189-
false -> parser(Rest, hdrname, S#ps{acc = [], acc_len = 0})
190-
end;
253+
%% Scan for LF in a binary. Handles CR LF normalization.
254+
%% Returns {ok, BeforeLF, AfterLF} | {more, CurrentLen} | {error, _}
255+
scan_until_lf(Bin) ->
256+
scan_until_lf(Bin, 0).
257+
258+
scan_until_lf(Bin, Pos) ->
259+
case Bin of
260+
<<_:Pos/binary, ?LF, _/binary>> ->
261+
<<Before:Pos/binary, ?LF, Rest/binary>> = Bin,
262+
{ok, Before, Rest};
263+
<<_:Pos/binary, ?CR, ?LF, _/binary>> ->
264+
<<Before:Pos/binary, ?CR, ?LF, Rest/binary>> = Bin,
265+
{ok, Before, Rest};
266+
<<_:Pos/binary, ?CR>> ->
267+
{more, Pos};
268+
<<_:Pos/binary, ?CR, Ch:8, _/binary>> ->
269+
{error, {unexpected_chars_in_command, [?CR, Ch]}};
270+
<<_:Pos/binary, _:8, _/binary>> ->
271+
scan_until_lf(Bin, Pos + 1);
272+
<<_:Pos/binary>> ->
273+
{more, Pos}
274+
end.
191275

192-
goto(hdrname, hdrvalue, Rest, S = #ps{acc = Acc}) ->
193-
parser(Rest, hdrvalue, S#ps{acc = [], acc_len = 0,
194-
hdrname = lists:reverse(Acc)});
276+
%% Scan a complete header line: Name:Value\n
277+
%% Fast path: no backslash or CR in the line.
278+
%% Returns:
279+
%% {ok, NameBin, ValueBin, Rest} — fast path, no escapes
280+
%% has_escapes — contains \ or CR, use slow path
281+
%% {no_value, NameBin} — LF before COLON
282+
%% {more, Len} — need more data
283+
%% {error, _} — CR not followed by LF
284+
scan_header_line(Bin) ->
285+
scan_hdr_name(Bin, 0).
286+
287+
scan_hdr_name(Bin, Pos) ->
288+
case Bin of
289+
<<_:Pos/binary, ?COLON, _/binary>> ->
290+
<<Name:Pos/binary, ?COLON, Rest/binary>> = Bin,
291+
scan_hdr_value(Rest, Name, 0);
292+
<<_:Pos/binary, ?LF, _/binary>> ->
293+
<<Name:Pos/binary, _/binary>> = Bin,
294+
{no_value, Name};
295+
<<_:Pos/binary, ?CR, ?LF, _/binary>> ->
296+
<<Name:Pos/binary, _/binary>> = Bin,
297+
{no_value, Name};
298+
<<_:Pos/binary, ?BSL, _/binary>> ->
299+
has_escapes;
300+
<<_:Pos/binary, ?CR>> ->
301+
{more, Pos};
302+
<<_:Pos/binary, ?CR, Ch:8, _/binary>> ->
303+
{error, {unexpected_chars_in_header, [?CR, Ch]}};
304+
<<_:Pos/binary, _:8, _/binary>> ->
305+
scan_hdr_name(Bin, Pos + 1);
306+
<<_:Pos/binary>> ->
307+
{more, Pos}
308+
end.
195309

196-
goto(hdrvalue, headers, Rest, S = #ps{acc = Acc, hdrs = Headers, hdrname = HdrName}) ->
197-
parser(Rest, headers, S#ps{hdrs = insert_header(Headers, HdrName,
198-
lists:reverse(Acc))}).
310+
scan_hdr_value(Bin, Name, Pos) ->
311+
case Bin of
312+
<<_:Pos/binary, ?LF, _/binary>> ->
313+
<<Value:Pos/binary, ?LF, Rest/binary>> = Bin,
314+
{ok, Name, Value, Rest};
315+
<<_:Pos/binary, ?CR, ?LF, _/binary>> ->
316+
<<Value:Pos/binary, ?CR, ?LF, Rest/binary>> = Bin,
317+
{ok, Name, Value, Rest};
318+
<<_:Pos/binary, ?BSL, _/binary>> ->
319+
has_escapes;
320+
<<_:Pos/binary, ?CR>> ->
321+
{more, Pos};
322+
<<_:Pos/binary, ?CR, Ch:8, _/binary>> ->
323+
{error, {unexpected_chars_in_header, [?CR, Ch]}};
324+
<<_:Pos/binary, _:8, _/binary>> ->
325+
scan_hdr_value(Bin, Name, Pos + 1);
326+
<<_:Pos/binary>> ->
327+
{more, Pos}
328+
end.
199329

200330
%%
201331
%% Helpers
202332
%%
203333

204-
unexpected_chars(noise) -> unexpected_chars_between_frames;
205-
unexpected_chars(command) -> unexpected_chars_in_command;
206-
unexpected_chars(hdrname) -> unexpected_chars_in_header;
207-
unexpected_chars(hdrvalue) -> unexpected_chars_in_header;
208-
unexpected_chars(_) -> unexpected_chars.
209-
210334
accum(Ch, S = #ps{acc = Acc, acc_len = Len}) ->
211335
S#ps{acc = [Ch | Acc], acc_len = Len + 1}.
212336

0 commit comments

Comments
 (0)