Skip to content

Commit 4c44622

Browse files
committed
Native STOMP - Make in runnable after the latest refactorings, fix queue type deduction
1 parent 256810d commit 4c44622

7 files changed

Lines changed: 83 additions & 66 deletions

File tree

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@
9999

100100
-export_type ([process_frame_result/0]).
101101

102+
-export([adapter_name/1]).
103+
102104
%%----------------------------------------------------------------------------
103105
%% Public API
104106
%%----------------------------------------------------------------------------
107+
adapter_name(#state{cfg = #cfg{adapter_info = #amqp_adapter_info{name = Name}}}) ->
108+
Name.
105109

106110
-spec initial_state(
107111
#stomp_configuration{},
@@ -313,7 +317,7 @@ process_connect(Implicit, Frame,
313317
adapter_info = AdapterInfo}}) ->
314318
process_request(
315319
fun(StateN) ->
316-
maybe
320+
Res1 = maybe
317321
{ok, Version} = negotiate_version(Frame),
318322
FT = frame_transformer(Version),
319323
Frame1 = FT(Frame),
@@ -322,12 +326,12 @@ process_connect(Implicit, Frame,
322326
VHost = login_header(Frame1, ?HEADER_HOST, DefaultVHost),
323327
Heartbeat = login_header(Frame1, ?HEADER_HEART_BEAT, "0,0"),
324328
{ProtoName, _} = AdapterInfo#amqp_adapter_info.protocol,
325-
StateN1 = StateN#state{cfg = #cfg{vhost = VHost,
326-
adapter_info = AdapterInfo#amqp_adapter_info{
327-
protocol = {ProtoName, Version}},
328-
frame_transformer = FT,
329-
auth_mechanism = Auth,
330-
auth_login = Username}},
329+
StateN1 = StateN#state{cfg = Config#cfg{vhost = VHost,
330+
adapter_info = AdapterInfo#amqp_adapter_info{
331+
protocol = {ProtoName, Version}},
332+
frame_transformer = FT,
333+
auth_mechanism = Auth,
334+
auth_login = Username}},
331335
{Username, AuthProps} = auth_props_for_creds(Creds, StateN1),
332336
{ok, User} ?= rabbit_access_control:check_user_login(Username, AuthProps),
333337
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, PeerIp),
@@ -347,10 +351,10 @@ process_connect(Implicit, Frame,
347351
false -> [{?HEADER_SERVER, server_header()} | Headers]
348352
end,
349353
"",
350-
StateN1#state{cfg = #cfg{
351-
session_id = SessionId,
352-
version = Version
353-
},
354+
StateN1#state{cfg = StateN1#state.cfg#cfg{
355+
session_id = SessionId,
356+
version = Version
357+
},
354358
user = User,
355359
authz_ctx = AuthzCtx}),
356360
self() ! connection_created,
@@ -375,14 +379,15 @@ process_connect(Implicit, Frame,
375379
rabbit_log:warning("STOMP login failed for user '~ts': "
376380
"this user's access is restricted to localhost", [EUsername]),
377381
error("Bad CONNECT", "non-loopback access denied", State)
378-
end
379-
case {Res, Implicit} of
382+
end,
383+
case {Res1, Implicit} of
380384
{{ok, _, StateN2}, implicit} ->
381385
self() ! connection_created, ok(StateN2);
382386
_ ->
383-
self() ! connection_created, Res
387+
self() ! connection_created, Res1
384388

385-
end,
389+
end
390+
end,
386391
State).
387392

388393
creds(_, _, #cfg{default_login = DefLogin,
@@ -659,7 +664,7 @@ maybe_delete_durable_sub_queue({topic, Name}, Frame,
659664
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
660665
QName = rabbit_stomp_util:subscription_queue_name(Name, Id, Frame),
661666
QRes = rabbit_misc:r(VHost, queue, list_to_binary(QName)),
662-
io:format("Durable QRes: ~p~n", [QRes]),
667+
?LOG_DEBUG("Durable QRes: ~p~n", [QRes]),
663668
delete_queue(QRes, Username),
664669
ok(State);
665670
false ->
@@ -753,6 +758,7 @@ do_subscribe(Destination, DestHdr, Frame,
753758
try
754759
{ok, State1} = consume_queue(QueueName, #{no_ack => (AckMode == auto),
755760
prefetch_count => Prefetch,
761+
mode => {simple_prefetch, Prefetch},
756762
consumer_tag => ConsumerTag,
757763
exclusive_consume => false,
758764
args => Arguments},
@@ -913,16 +919,16 @@ do_send(Destination, _DestHdr,
913919

914920
Message = rabbit_message_interceptor:intercept(Message0),
915921

916-
io:format("Message: ~p~n", [Message]),
922+
%% io:format("Message: ~p~n", [Message]),
917923

918924
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
919-
io:format("QNames ~p~n", [QNames]),
925+
%% io:format("QNames ~p~n", [QNames]),
920926

921927
Delivery = {Message, DeliveryOptions, QNames},
922-
io:format("Delivery: ~p~n", [Delivery]),
928+
%% io:format("Delivery: ~p~n", [Delivery]),
923929
deliver_to_queues(ExchangeName, Delivery, State2);
924930
{error, _} = Err ->
925-
io:format("Err ~p~n", [Err]),
931+
%% io:format("Err ~p~n", [Err]),
926932
Err
927933
end.
928934

@@ -941,7 +947,7 @@ deliver_to_queues(XName,
941947
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames),
942948
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
943949
MsgSeqNo = maps:get(correlation, Options, undefined),
944-
io:format("Qs: ~p~n", [Qs]),
950+
%% io:format("Qs: ~p~n", [Qs]),
945951
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
946952
{ok, QStates, Actions} ->
947953
rabbit_global_counters:messages_routed(stomp, length(Qs)),
@@ -1183,10 +1189,10 @@ deliver_to_client(ConsumerTag, Ack, Msgs, State) ->
11831189

11841190
deliver_one_to_client(ConsumerTag, _Ack, {QName, QPid, MsgId, Redelivered, MsgCont0} = _Msg,
11851191
State = #state{queue_states = QStates,
1186-
delivery_tag = DeliveryTag}) ->
1192+
delivery_tag = DeliveryTag}) ->
11871193

1188-
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
1189-
ExchangeNameBin = mc:get_annotation(exchange, MsgCont0),
1194+
[RoutingKey | _] = mc:routing_keys(MsgCont0),
1195+
ExchangeNameBin = mc:exchange(MsgCont0),
11901196
MsgCont = mc:convert(mc_amqpl, MsgCont0),
11911197
Content = mc:protocol_state(MsgCont),
11921198
Delivery = #'basic.deliver'{consumer_tag = ConsumerTag,
@@ -1338,7 +1344,7 @@ ensure_reply_queue(TempQueueId, State = #state{reply_queues = RQS,
13381344
%%----------------------------------------------------------------------------
13391345

13401346
ensure_receipt(Frame = #stomp_frame{command = Command}, State) ->
1341-
io:format("ER Frame: ~p~n", [Frame]),
1347+
%% io:format("ER Frame: ~p~n", [Frame]),
13421348
case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of
13431349
{ok, Id} -> do_receipt(Command, Id, State);
13441350
not_found -> State
@@ -1653,7 +1659,7 @@ delete_queue(QRes, Username) ->
16531659
case rabbit_amqqueue:with(
16541660
QRes,
16551661
fun (Q) ->
1656-
io:format("Delete queue ~p~n", [rabbit_queue_type:delete(Q, false, false, Username)])
1662+
rabbit_queue_type:delete(Q, false, false, Username)
16571663
end,
16581664
fun (not_found) ->
16591665
ok;
@@ -1693,7 +1699,7 @@ ensure_binding(QName, {Exchange, RoutingKey}, _State = #state{cfg = #cfg{
16931699
ok ->
16941700
ok
16951701
end,
1696-
io:format("rabbit_binding:add ~p ~p~n", [Binding, Res]),
1702+
%% io:format("rabbit_binding:add ~p ~p~n", [Binding, Res]),
16971703
Res.
16981704

16991705
check_resource_access(User, Resource, Perm, Context) ->
@@ -1911,18 +1917,22 @@ new_amqqueue(QNameBin0, Type, Params0, _State = #state{user = #user{username = U
19111917
false -> [{auto_delete, true}, {exclusive, true} | Params0];
19121918
true -> Params0
19131919
end,
1914-
1915-
amqqueue:new(QName,
1916-
none,
1917-
proplists:get_value(durable, Params, false),
1918-
proplists:get_value(auto_delete, Params, false),
1919-
case proplists:get_value(exclusive, Params, false) of
1920-
false -> none;
1921-
true -> self()
1922-
end,
1923-
proplists:get_value(arguments, Params, []),
1924-
VHost,
1925-
#{user => Username}).
1920+
Args = proplists:get_value(arguments, Params, []),
1921+
1922+
AMQ = amqqueue:new(QName,
1923+
none,
1924+
proplists:get_value(durable, Params, false),
1925+
proplists:get_value(auto_delete, Params, false),
1926+
case proplists:get_value(exclusive, Params, false) of
1927+
false -> none;
1928+
true -> self()
1929+
end,
1930+
Args,
1931+
VHost,
1932+
#{user => Username},
1933+
rabbit_amqqueue:get_queue_type(Args)),
1934+
%% io:format("~p", [AMQ]),
1935+
AMQ.
19261936

19271937

19281938
to_url([]) -> [];

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ init([SupHelperPid, Ref, Configuration]) ->
7373
rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
7474
[self(), ConnName]),
7575

76-
ParseState = rabbit_stomp_frame:initial_state(),
7776
ParserConfig = #stomp_parser_config{
7877
max_headers = Configuration#stomp_configuration.max_headers,
7978
max_header_length = Configuration#stomp_configuration.max_header_length,
@@ -403,19 +402,19 @@ processor_args(Configuration, Sock) ->
403402
ssl_login_name(RealSocket, Configuration), PeerAddr}.
404403

405404
adapter_info(Sock) ->
406-
case rabbit_net:socket_ends(Socket, inbound) of
407-
{ok, {PeerIp, PeerPort, Ip, Port}} ->
408-
#amqp_adapter_info{protocol = {'STOMP', 0},
409-
name = Name,
410-
host = Host,
411-
port = Port,
412-
peer_host = PeerHost,
413-
peer_port = PeerPort,
414-
additional_info = maybe_ssl_info(Sock)}
415-
process_connect(ConnectPacket, Socket, ConnName, SendFun, SocketEnds);
416-
{error, Reason} ->
417-
{error, {socket_ends, Reason}}
418-
end.
405+
%% case rabbit_net:socket_ends(Socket, inbound) of
406+
%% {ok, {PeerIp, PeerPort, Ip, Port}} ->
407+
%% #amqp_adapter_info{protocol = {'STOMP', 0},
408+
%% name = Name,
409+
%% host = Host,
410+
%% port = Port,
411+
%% peer_host = PeerHost,
412+
%% peer_port = PeerPort,
413+
%% additional_info = maybe_ssl_info(Sock)}
414+
%% process_connect(ConnectPacket, Socket, ConnName, SendFun, SocketEnds);
415+
%% {error, Reason} ->
416+
%% {error, {socket_ends, Reason}}
417+
%% end.
419418

420419
amqp_connection:socket_adapter_info(Sock, {'STOMP', 0}).
421420

deps/rabbitmq_stomp/test/python_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,11 @@ groups() ->
3131
].
3232

3333
init_per_suite(Config) ->
34-
DataDir = ?config(data_dir, Config),
3534
{ok, _} = rabbit_ct_helpers:exec(["pip", "install", "-r", requirements_path(Config),
36-
"--target", deps_path(Config)]),
35+
"--target", deps_path(Config)]),
3736
Config.
3837

3938
end_per_suite(Config) ->
40-
DataDir = ?config(data_dir, Config),
4139
ok = file:del_dir_r(deps_path(Config)),
4240
Config.
4341

@@ -82,8 +80,10 @@ run(Config, Test) ->
8280
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
8381
StompPortTls = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls),
8482
AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
83+
MgmtPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mgmt),
8584
NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
8685
os:putenv("AMQP_PORT", integer_to_list(AmqpPort)),
86+
os:putenv("MGMT_PORT", integer_to_list(MgmtPort)),
8787
os:putenv("STOMP_PORT", integer_to_list(StompPort)),
8888
os:putenv("STOMP_PORT_TLS", integer_to_list(StompPortTls)),
8989
os:putenv("RABBITMQ_NODENAME", atom_to_list(NodeName)),
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
stomp.py==8.1.0
22
pika==1.1.0
3-
3+
rabbitman===0.1.0

deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import time
1111
import os
1212
import re
13-
13+
import rabbitman
1414

1515
class TestUserGeneratedQueueName(base.BaseTest):
1616

@@ -34,6 +34,11 @@ def test_quorum_queue(self):
3434
# let the quorum queue some time to start
3535
time.sleep(5)
3636

37+
client = rabbitman.Client(f'http://localhost:{(os.environ["MGMT_PORT"])}', 'guest', 'guest')
38+
queue = client.get_queues_by_vhost_and_name("/", queueName)
39+
40+
self.assertEqual(queue['type'], 'quorum')
41+
3742
connection = pika.BlockingConnection(
3843
pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
3944
channel = connection.channel()

deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import time
1111
import os
1212
import re
13-
import urllib.request, json
13+
import rabbitman
1414

1515
class TestUserGeneratedQueueName(base.BaseTest):
1616

@@ -34,12 +34,17 @@ def test_stream_queue(self):
3434
'id': 1234,
3535
'prefetch-count': 10
3636
},
37-
ack="client"
37+
ack="client"
3838
)
3939

4040
# let the stream queue some time to start
4141
time.sleep(5)
4242

43+
client = rabbitman.Client(f'http://localhost:{(os.environ["MGMT_PORT"])}', 'guest', 'guest')
44+
queue = client.get_queues_by_vhost_and_name("/", queueName)
45+
46+
self.assertEqual(queue['type'], 'stream')
47+
4348
connection = pika.BlockingConnection(
4449
pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
4550
channel = connection.channel()

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ websocket_info({start_heartbeats, _},
177177
State = #state{heartbeat_mode = no_heartbeat}) ->
178178
{ok, State};
179179
websocket_info({start_heartbeats, {0, 0}}, State) ->
180-
{ok, State#state{timeout_sec = {0, 0}}};
180+
{ok, State};
181181
websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
182182
State = #state{socket = Sock,
183183
heartbeat_sup = SupPid,
@@ -187,8 +187,7 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
187187
ReceiveFun = fun() -> Self ! client_timeout end,
188188
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
189189
SendFun, ReceiveTimeout, ReceiveFun),
190-
{ok, State#state{heartbeat = Heartbeat,
191-
timeout_sec = {0, 0}}};
190+
{ok, State#state{heartbeat = Heartbeat}};
192191
websocket_info(connection_created, State) ->
193192
Infos = infos(?INFO_ITEMS ++ ?OTHER_METRICS, State),
194193
?LOG_DEBUG("Connection created infos ~p", [Infos]),
@@ -379,9 +378,8 @@ info_internal(garbage_collection, _State) ->
379378
info_internal(reductions, _State) ->
380379
{reductions, Reductions} = erlang:process_info(self(), reductions),
381380
Reductions;
382-
info_internal(timeout, #state{timeout_sec = {_, Receive}}) ->
383-
Receive;
384-
info_internal(timeout, #state{timeout_sec = undefined}) ->
381+
info_internal(timeout, State) ->
382+
%% TODO: real value
385383
0;
386384
info_internal(conn_name, #state{conn_name = Val}) ->
387385
rabbit_data_coercion:to_binary(Val);

0 commit comments

Comments
 (0)