Skip to content

Commit 41ce7a9

Browse files
committed
fixup! convert to mc, also add metrics
1 parent 7538cb4 commit 41ce7a9

2 files changed

Lines changed: 56 additions & 47 deletions

File tree

deps/rabbitmq_stomp/src/rabbit_stomp.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
start(normal, []) ->
2929
Config = parse_configuration(),
3030
Listeners = parse_listener_configuration(),
31+
rabbit_global_counters:init([{protocol, stomp}]),
3132
Result = rabbit_stomp_sup:start_link(Listeners, Config),
3233
EMPid = case rabbit_event:start_link() of
3334
{ok, Pid} -> Pid;

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
-define(MAX_PERMISSION_CACHE_SIZE, 12).
5959
-define(QUEUE, lqueue).
6060

61+
-import(rabbit_misc, [maps_put_truthy/3]).
62+
6163
adapter_name(State) ->
6264
#proc_state{adapter_info = #amqp_adapter_info{name = Name}} = State,
6365
Name.
@@ -765,28 +767,27 @@ do_send(Destination, _DestHdr,
765767
%% io:format("Parse_routing: ~p~n", [{ExchangeNameList, RoutingKeyList}]),
766768
RoutingKey = list_to_binary(RoutingKeyList),
767769

768-
%% Method = #'basic.publish'{
769-
%% exchange = list_to_binary(Exchange),
770-
%% routing_key = list_to_binary(RoutingKey),
771-
%% mandatory = false,
772-
%% immediate = false},
770+
771+
rabbit_global_counters:messages_received(stomp, 1),
773772

774773
ExchangeName = rabbit_misc:r(VHost, exchange, list_to_binary(ExchangeNameList)),
775774
check_resource_access(User, ExchangeName, write, AuthzCtx),
776775
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
777776
check_internal_exchange(Exchange),
778777
check_topic_authorisation(Exchange, User, RoutingKey, AuthzCtx, write),
779778

780-
{DoConfirm, MsgSeqNo, State2} =
779+
{DeliveryOptions, _MsgSeqNo, State2} =
781780
case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of
781+
not_found ->
782+
{maps_put_truthy(flow, Flow, #{}), undefined, State1};
782783
{ok, Id} ->
784+
rabbit_global_counters:messages_received_confirm(stomp, 1),
783785
SeqNo = State1#proc_state.msg_seq_no,
784786
%% I think it's safe to just add it here because
785787
%% if there is an error down the road process dies
786788
StateRR = record_receipt(true, SeqNo, Id, State1),
787-
{true, SeqNo, StateRR#proc_state{msg_seq_no = SeqNo + 1}};
788-
not_found ->
789-
{false, undefined, State1}
789+
Opts = maps_put_truthy(flow, Flow, #{correlation => SeqNo}),
790+
{Opts, SeqNo, StateRR#proc_state{msg_seq_no = SeqNo + 1}}
790791
end,
791792

792793
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
@@ -798,54 +799,59 @@ do_send(Destination, _DestHdr,
798799
protocol = none,
799800
payload_fragments_rev = [BodyFragments]
800801
},
801-
Content = rabbit_message_interceptor:intercept(Content0),
802802

803-
{ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content),
803+
Message0 = mc_amqpl:message(ExchangeName, RoutingKey, Content0),
804804

805-
Delivery = #delivery{
806-
mandatory = false,
807-
confirm = DoConfirm,
808-
sender = self(),
809-
message = BasicMessage,
810-
msg_seq_no = MsgSeqNo,
811-
flow = Flow
812-
},
813-
%% io:format("Delivery: ~p~n", [Delivery]),
814-
case rabbit_exchange:lookup(ExchangeName) of
815-
{ok, Exchange} ->
816-
QNames = rabbit_exchange:route(Exchange, Delivery, #{return_binding_keys => true}),
817-
%% io:format("QNames ~p~n", [QNames]),
818-
deliver_to_queues(Delivery, QNames, State2);
819-
{error, not_found} ->
820-
log_error("~s not found", [rabbit_misc:rs(ExchangeName)], ExchangeName),
821-
{error, exchange_not_found, State2}
822-
end;
805+
Message = rabbit_message_interceptor:intercept(Message0),
806+
807+
%% {ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content),
808+
809+
%% Delivery = #delivery{
810+
%% mandatory = false,
811+
%% confirm = DoConfirm,
812+
%% sender = self(),
813+
%% message = BasicMessage,
814+
%% msg_seq_no = MsgSeqNo,
815+
%% flow = Flow
816+
%% },
817+
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
818+
%% io:format("QNames ~p~n", [QNames]),
823819

820+
Delivery = {Message, DeliveryOptions, QNames},
821+
%% io:format("Delivery: ~p~n", [Delivery]),
822+
deliver_to_queues(ExchangeName, Delivery, State2);
824823
{error, _} = Err ->
825824
%% io:format("Err ~p~n", [Err]),
826825
Err
827826
end.
828827

829-
deliver_to_queues(Delivery = #delivery{message = #basic_message{exchange_name = XName},
830-
confirm = Confirm,
831-
msg_seq_no = MsgSeqNo},
832-
RoutedToQNames,
828+
deliver_to_queues(_XName,
829+
{_Message, Options, _RoutedToQueues = []},
830+
State)
831+
when not is_map_key(correlation, Options) -> %% optimisation when there are no queues
832+
%%?INCR_STATS(exchange_stats, XName, 1, publish, State),
833+
rabbit_global_counters:messages_unroutable_dropped(stomp, 1),
834+
%%?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
835+
{ok, State};
836+
837+
deliver_to_queues(XName,
838+
{Message, Options, RoutedToQNames},
833839
State0 = #proc_state{queue_states = QStates0}) ->
834840
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames),
835841
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
842+
MsgSeqNo = maps:get(correlation, Options, undefined),
836843
%% io:format("Qs: ~p~n", [Qs]),
837-
case rabbit_queue_type:deliver(Qs, Delivery, QStates0) of
844+
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
838845
{ok, QStates, Actions} ->
839-
%% rabbit_global_counters:messages_routed(ProtoVer, length(Qs)), ??
846+
rabbit_global_counters:messages_routed(stomp, length(Qs)),
840847
QueueNames = rabbit_amqqueue:queue_names(Qs),
841-
State1 = process_routing_confirm(Confirm, QueueNames,
842-
MsgSeqNo, XName, State0),
848+
State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0),
843849
%% Actions must be processed after registering confirms as actions may
844850
%% contain rejections of publishes.
845851
{ok, handle_queue_actions(Actions, State1#proc_state{queue_states = QStates})};
846852
{error, Reason} ->
847853
log_error("Failed to deliver message with packet_id=~p to queues: ~p",
848-
[Delivery#delivery.msg_seq_no, Reason], none),
854+
[MsgSeqNo, Reason], none),
849855
{error, Reason, State0}
850856
end.
851857

@@ -866,11 +872,11 @@ record_confirms([], State) ->
866872
record_confirms(MXs, State = #proc_state{confirmed = C}) ->
867873
State#proc_state{confirmed = [MXs | C]}.
868874

869-
process_routing_confirm(false, _, _, _, State) ->
875+
process_routing_confirm(undefined, _, _, State) ->
870876
State;
871-
process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
877+
process_routing_confirm(MsgSeqNo, [], XName, State) ->
872878
record_confirms([{MsgSeqNo, XName}], State);
873-
process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
879+
process_routing_confirm(MsgSeqNo, QRefs, XName, State) ->
874880
State#proc_state{unconfirmed =
875881
rabbit_confirms:insert(MsgSeqNo, QRefs, XName, State#proc_state.unconfirmed)}.
876882

@@ -1074,16 +1080,18 @@ deliver_to_client(ConsumerTag, Ack, Msgs, State) ->
10741080
deliver_one_to_client(ConsumerTag, Ack, Msg, S)
10751081
end, State, Msgs).
10761082

1077-
deliver_one_to_client(ConsumerTag, _Ack, {QName, QPid, MsgId, Redelivered,
1078-
#basic_message{exchange_name = ExchangeName,
1079-
routing_keys = [RoutingKey | _CcRoutes],
1080-
content = Content}},
1083+
deliver_one_to_client(ConsumerTag, _Ack, {QName, QPid, MsgId, Redelivered, MsgCont0} = _Msg,
10811084
State = #proc_state{queue_states = QStates,
10821085
delivery_tag = DeliveryTag}) ->
1086+
1087+
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
1088+
ExchangeNameBin = mc:get_annotation(exchange, MsgCont0),
1089+
MsgCont = mc:convert(mc_amqpl, MsgCont0),
1090+
Content = mc:protocol_state(MsgCont),
10831091
Delivery = #'basic.deliver'{consumer_tag = ConsumerTag,
10841092
delivery_tag = DeliveryTag,
10851093
redelivered = Redelivered,
1086-
exchange = ExchangeName#resource.name,
1094+
exchange = ExchangeNameBin,
10871095
routing_key = RoutingKey},
10881096

10891097

@@ -1102,7 +1110,7 @@ deliver_one_to_client(ConsumerTag, _Ack, {QName, QPid, MsgId, Redelivered,
11021110

11031111

11041112
send_delivery(QName, MsgId, Delivery = #'basic.deliver'{consumer_tag = ConsumerTag,
1105-
delivery_tag = DeliveryTag},
1113+
delivery_tag = DeliveryTag},
11061114
Properties, Body, DeliveryCtx,
11071115
State = #proc_state{
11081116
session_id = SessionId,

0 commit comments

Comments
 (0)