diff --git a/deps/rabbit/src/rabbit_fifo_dlx_sup.erl b/deps/rabbit/src/rabbit_fifo_dlx_sup.erl index 9231fb1e76af..987c2f36581c 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_sup.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_sup.erl @@ -24,6 +24,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + rabbit_fifo_dlx_worker:init_ets(), SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 1}, diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index b79a3154fdca..d40ad46e7853 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -29,6 +29,7 @@ -behaviour(gen_server). +-export([init_ets/0, block_reasons/1]). -export([start_link/1]). %% gen_server callbacks -export([init/1, terminate/2, handle_continue/2, @@ -36,6 +37,7 @@ code_change/3, format_status/1]). -define(HIBERNATE_AFTER, 4*60*1000). +-define(UNROUTABLE_TABLE, rabbit_fifo_dlx_unroutable). -record(pending, { %% consumed_msg_id is not to be confused with consumer delivery tag. @@ -100,6 +102,21 @@ -type state() :: #state{}. +init_ets() -> + ets:new(?UNROUTABLE_TABLE, [named_table, public]), + ok. + +block_reasons(QRef) -> + try ets:lookup(?UNROUTABLE_TABLE, QRef) of + [{QRef, Count}] -> + Count; + [] -> + 0 + catch + error:badarg -> + 0 + end. + start_link(QRef) -> gen_server:start_link(?MODULE, QRef, [{hibernate_after, ?HIBERNATE_AFTER}]). @@ -131,7 +148,8 @@ handle_continue(QRef, undefined) -> {stop, Error, undefined} end. -terminate(_Reason, State) -> +terminate(_Reason, #state{queue_ref = QRef} = State) -> + _ = catch ets:delete(?UNROUTABLE_TABLE, QRef), cancel_timer(State). handle_call(Request, From, State) -> @@ -623,7 +641,7 @@ log_missing_dlx_once(#state{exchange_ref = SameDlx, State; log_missing_dlx_once(#state{exchange_ref = DlxResource, queue_ref = QueueResource, - logged = Logged} = State) -> + logged = Logged} = State0) -> ?LOG_WARNING("Cannot forward any dead-letter messages from source ~ts because " "its configured dead-letter-exchange ~ts does not exist. " "Either create the configured dead-letter-exchange or re-configure " @@ -631,18 +649,22 @@ log_missing_dlx_once(#state{exchange_ref = DlxResource, "dead-lettered messages from piling up in the source queue. " "This message will not be logged again.", [rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource)]), - State#state{logged = maps:put(missing_dlx, DlxResource, Logged)}. + State = State0#state{logged = maps:put(missing_dlx, DlxResource, Logged)}, + set_unroutable(State), + State. clear_log_missing_dlx_once(#state{exchange_ref = DlxResource, queue_ref = QueueResource, pendings = Pendings, - logged = #{missing_dlx := MissingDlx} = Logged} = State) -> + logged = #{missing_dlx := MissingDlx} = Logged} = State0) -> ?LOG_INFO("Dead-letter-exchange ~ts found for ~ts. Forwarding was previously " "blocked since the configured dead-letter-exchange ~ts could not be found. " "Forwarding of ~b pending dead-letter messages will be attempted.", [rabbit_misc:rs(DlxResource), rabbit_misc:rs(QueueResource), rabbit_misc:rs(MissingDlx), maps:size(Pendings)]), - State#state{logged = maps:remove(missing_dlx, Logged)}; + State = State0#state{logged = maps:remove(missing_dlx, Logged)}, + set_unroutable(State), + State; clear_log_missing_dlx_once(State) -> State. @@ -653,7 +675,7 @@ log_no_route_once(#state{exchange_ref = SameDlx, log_no_route_once(#state{queue_ref = QueueResource, exchange_ref = DlxResource, routing_key = RoutingKey, - logged = Logged} = State) -> + logged = Logged} = State0) -> ?LOG_WARNING("Cannot forward any dead-letter messages from source ~ts " "with configured dead-letter-exchange ~ts and configured " "dead-letter-routing-key '~ts'. This can happen either if the dead-letter " @@ -664,13 +686,15 @@ log_no_route_once(#state{queue_ref = QueueResource, "in the source queue. " "This message will not be logged again.", [rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource), RoutingKey]), - State#state{logged = maps:put(no_route, {DlxResource, RoutingKey}, Logged)}. + State = State0#state{logged = maps:put(no_route, {DlxResource, RoutingKey}, Logged)}, + set_unroutable(State), + State. clear_log_no_route_once(#state{exchange_ref = DlxResource, routing_key = RoutingKey, queue_ref = QueueResource, pendings = Pendings, - logged = #{no_route := {OldDlx, OldRoutingKey}} = Logged} = State) -> + logged = #{no_route := {OldDlx, OldRoutingKey}} = Logged} = State0) -> ?LOG_INFO("Discovered a route to forward dead-letter messages from ~ts on " "configured dead-letter-exchange ~ts and dead-letter-routing-key '~ts'. " "Previously dead-letter messages could not be forwarded on configured " @@ -678,7 +702,9 @@ clear_log_no_route_once(#state{exchange_ref = DlxResource, "Forwarding of ~b pending dead-letter messages will be attempted.", [rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource), RoutingKey, rabbit_misc:rs(OldDlx), OldRoutingKey, maps:size(Pendings)]), - State#state{logged = maps:remove(no_route, Logged)}; + State = State0#state{logged = maps:remove(no_route, Logged)}, + set_unroutable(State), + State; clear_log_no_route_once(State) -> State. @@ -690,10 +716,21 @@ log_cycle_once(Queues, _, #state{logged = Logged} = State) State; log_cycle_once(Queues, RoutingKeys, #state{exchange_ref = DlxResource, queue_ref = QueueResource, - logged = Logged} = State) -> + logged = Logged} = State0) -> ?LOG_WARNING("Dead-letter queues cycle detected for source ~ts " "with dead-letter exchange ~ts and routing keys ~tp: ~tp " "This message will not be logged again.", [rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource), RoutingKeys, Queues]), - State#state{logged = maps:put({cycle, Queues}, true, Logged)}. + State = State0#state{logged = maps:put({cycle, Queues}, true, Logged)}, + set_unroutable(State), + State. + +set_unroutable(#state{queue_ref = QRef, logged = Logged}) -> + Count = map_size(Logged), + try + ets:update_element(?UNROUTABLE_TABLE, QRef, {2, Count}, {QRef, Count}) + catch + error:badarg -> + ok + end. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 32d56539d5e8..0562af2de2a2 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -651,6 +651,12 @@ handle_tick(QName, (_, _, Acc) -> Acc end, info(Q, Keys), Overview), + Blocks = case Cfg of + #{dead_letter_handler := at_least_once} -> + rabbit_fifo_dlx_worker:block_reasons(QName); + _ -> + undefined + end, MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes, MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded, Infos = [{consumers, NumConsumers}, @@ -664,6 +670,7 @@ handle_tick(QName, {messages_persistent, NumMessages}, {messages_dlx, NumDiscarded + NumDiscardedCheckedOut}, {message_bytes_dlx, MsgBytesDiscarded}, + {at_least_once_dlx_block_reasons, Blocks}, {single_active_consumer_tag, SacTag}, {single_active_consumer_pid, SacPid}, {leader, node()},