3636 proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3737 rabbit_mqtt_processor :state (),
3838 connection_state :: running | blocked ,
39- conserve :: boolean ( ),
39+ blocked_by :: sets : set ( rabbit_alarm : resource_alarm_source () ),
4040 stats_timer :: rabbit_event :state (),
4141 keepalive = rabbit_mqtt_keepalive :init () :: rabbit_mqtt_keepalive :state (),
4242 conn_name :: binary ()
@@ -53,8 +53,8 @@ start_link(Ref, _Transport, []) ->
5353-spec conserve_resources (pid (),
5454 rabbit_alarm :resource_alarm_source (),
5555 rabbit_alarm :resource_alert ()) -> ok .
56- conserve_resources (Pid , _ , {_ , Conserve , _ }) ->
57- Pid ! {conserve_resources , Conserve },
56+ conserve_resources (Pid , Source , {_ , Conserve , _ }) ->
57+ Pid ! {conserve_resources , Source , Conserve },
5858 ok .
5959
6060-spec info (pid (), rabbit_types :info_keys ()) ->
@@ -78,15 +78,15 @@ init(Ref) ->
7878 {ok , ConnStr } ->
7979 ConnName = rabbit_data_coercion :to_binary (ConnStr ),
8080 ? LOG_DEBUG (" MQTT accepting TCP connection ~tp (~ts )" , [self (), ConnName ]),
81- _ = rabbit_alarm :register (self (), {? MODULE , conserve_resources , []}),
81+ Alarms = rabbit_alarm :register (self (), {? MODULE , conserve_resources , []}),
8282 LoginTimeout = application :get_env (? APP_NAME , login_timeout , 10_000 ),
8383 erlang :send_after (LoginTimeout , self (), login_timeout ),
8484 State0 = # state {socket = RealSocket ,
8585 proxy_socket = rabbit_net :maybe_get_proxy_socket (Sock ),
8686 conn_name = ConnName ,
8787 await_recv = false ,
8888 connection_state = running ,
89- conserve = false ,
89+ blocked_by = sets : from_list ( Alarms , [{ version , 2 }]) ,
9090 parse_state = rabbit_mqtt_packet :init_state (),
9191 stats_timer = rabbit_event :init_stats_timer ()},
9292 State = control_throttle (State0 ),
@@ -185,9 +185,16 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
185185 when Tag =:= tcp_error ; Tag =:= ssl_error ->
186186 network_error (Reason , State );
187187
188- handle_info ({conserve_resources , Conserve }, State ) ->
188+ handle_info ({conserve_resources , Source , Conserve },
189+ # state {blocked_by = BlockedBy0 } = State ) ->
190+ BlockedBy = case Conserve of
191+ true ->
192+ sets :add_element (Source , BlockedBy0 );
193+ false ->
194+ sets :del_element (Source , BlockedBy0 )
195+ end ,
189196 maybe_process_deferred_recv (
190- control_throttle (State # state { conserve = Conserve }));
197+ control_throttle (State # state { blocked_by = BlockedBy }));
191198
192199handle_info ({bump_credit , Msg }, State ) ->
193200 credit_flow :handle_bump_msg (Msg ),
@@ -417,10 +424,11 @@ run_socket(State = #state{ socket = Sock }) ->
417424 State # state { await_recv = true }.
418425
419426control_throttle (State = # state {connection_state = ConnState ,
420- conserve = Conserve ,
427+ blocked_by = BlockedBy ,
421428 proc_state = PState ,
422429 keepalive = KState
423430 }) ->
431+ Conserve = not sets :is_empty (BlockedBy ),
424432 Throttle = case PState of
425433 connect_packet_unprocessed -> Conserve ;
426434 _ -> rabbit_mqtt_processor :throttle (Conserve , PState )
@@ -537,7 +545,7 @@ format_state(#state{socket = Socket,
537545 parse_state = _ ,
538546 proc_state = PState ,
539547 connection_state = ConnectionState ,
540- conserve = Conserve ,
548+ blocked_by = BlockedBy ,
541549 stats_timer = StatsTimer ,
542550 keepalive = Keepalive ,
543551 conn_name = ConnName
@@ -552,7 +560,7 @@ format_state(#state{socket = Socket,
552560 rabbit_mqtt_processor :format_status (PState )
553561 end ,
554562 connection_state => ConnectionState ,
555- conserve => Conserve ,
563+ blocked_by => lists : sort ( sets : to_list ( BlockedBy )) ,
556564 stats_timer => StatsTimer ,
557565 keepalive => Keepalive ,
558566 conn_name => ConnName }.
0 commit comments