2929
3030-behaviour (gen_server ).
3131
32+ -export ([init_ets /0 , block_reasons /1 ]).
3233-export ([start_link /1 ]).
3334% % gen_server callbacks
3435-export ([init /1 , terminate /2 , handle_continue /2 ,
3536 handle_cast /2 , handle_call /3 , handle_info /2 ,
3637 code_change /3 , format_status /1 ]).
3738
3839-define (HIBERNATE_AFTER , 4 * 60 * 1000 ).
40+ -define (UNROUTABLE_TABLE , rabbit_fifo_dlx_unroutable ).
3941
4042-record (pending , {
4143 % % consumed_msg_id is not to be confused with consumer delivery tag.
100102
101103-type state () :: # state {}.
102104
105+ init_ets () ->
106+ ets :new (? UNROUTABLE_TABLE , [named_table , public ]),
107+ ok .
108+
109+ block_reasons (QRef ) ->
110+ try ets :lookup (? UNROUTABLE_TABLE , QRef ) of
111+ [{QRef , Count }] ->
112+ Count ;
113+ [] ->
114+ 0
115+ catch
116+ error :badarg ->
117+ 0
118+ end .
119+
103120start_link (QRef ) ->
104121 gen_server :start_link (? MODULE , QRef , [{hibernate_after , ? HIBERNATE_AFTER }]).
105122
@@ -131,7 +148,8 @@ handle_continue(QRef, undefined) ->
131148 {stop , Error , undefined }
132149 end .
133150
134- terminate (_Reason , State ) ->
151+ terminate (_Reason , # state {queue_ref = QRef } = State ) ->
152+ _ = catch ets :delete (? UNROUTABLE_TABLE , QRef ),
135153 cancel_timer (State ).
136154
137155handle_call (Request , From , State ) ->
@@ -623,26 +641,30 @@ log_missing_dlx_once(#state{exchange_ref = SameDlx,
623641 State ;
624642log_missing_dlx_once (# state {exchange_ref = DlxResource ,
625643 queue_ref = QueueResource ,
626- logged = Logged } = State ) ->
644+ logged = Logged } = State0 ) ->
627645 ? LOG_WARNING (" Cannot forward any dead-letter messages from source ~ts because "
628646 " its configured dead-letter-exchange ~ts does not exist. "
629647 " Either create the configured dead-letter-exchange or re-configure "
630648 " the dead-letter-exchange policy for the source queue to prevent "
631649 " dead-lettered messages from piling up in the source queue. "
632650 " This message will not be logged again." ,
633651 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource )]),
634- State # state {logged = maps :put (missing_dlx , DlxResource , Logged )}.
652+ State = State0 # state {logged = maps :put (missing_dlx , DlxResource , Logged )},
653+ set_unroutable (State ),
654+ State .
635655
636656clear_log_missing_dlx_once (# state {exchange_ref = DlxResource ,
637657 queue_ref = QueueResource ,
638658 pendings = Pendings ,
639- logged = #{missing_dlx := MissingDlx } = Logged } = State ) ->
659+ logged = #{missing_dlx := MissingDlx } = Logged } = State0 ) ->
640660 ? LOG_INFO (" Dead-letter-exchange ~ts found for ~ts . Forwarding was previously "
641661 " blocked since the configured dead-letter-exchange ~ts could not be found. "
642662 " Forwarding of ~b pending dead-letter messages will be attempted." ,
643663 [rabbit_misc :rs (DlxResource ), rabbit_misc :rs (QueueResource ),
644664 rabbit_misc :rs (MissingDlx ), maps :size (Pendings )]),
645- State # state {logged = maps :remove (missing_dlx , Logged )};
665+ State = State0 # state {logged = maps :remove (missing_dlx , Logged )},
666+ set_unroutable (State ),
667+ State ;
646668clear_log_missing_dlx_once (State ) ->
647669 State .
648670
@@ -653,7 +675,7 @@ log_no_route_once(#state{exchange_ref = SameDlx,
653675log_no_route_once (# state {queue_ref = QueueResource ,
654676 exchange_ref = DlxResource ,
655677 routing_key = RoutingKey ,
656- logged = Logged } = State ) ->
678+ logged = Logged } = State0 ) ->
657679 ? LOG_WARNING (" Cannot forward any dead-letter messages from source ~ts "
658680 " with configured dead-letter-exchange ~ts and configured "
659681 " dead-letter-routing-key '~ts '. This can happen either if the dead-letter "
@@ -664,21 +686,25 @@ log_no_route_once(#state{queue_ref = QueueResource,
664686 " in the source queue. "
665687 " This message will not be logged again." ,
666688 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ), RoutingKey ]),
667- State # state {logged = maps :put (no_route , {DlxResource , RoutingKey }, Logged )}.
689+ State = State0 # state {logged = maps :put (no_route , {DlxResource , RoutingKey }, Logged )},
690+ set_unroutable (State ),
691+ State .
668692
669693clear_log_no_route_once (# state {exchange_ref = DlxResource ,
670694 routing_key = RoutingKey ,
671695 queue_ref = QueueResource ,
672696 pendings = Pendings ,
673- logged = #{no_route := {OldDlx , OldRoutingKey }} = Logged } = State ) ->
697+ logged = #{no_route := {OldDlx , OldRoutingKey }} = Logged } = State0 ) ->
674698 ? LOG_INFO (" Discovered a route to forward dead-letter messages from ~ts on "
675699 " configured dead-letter-exchange ~ts and dead-letter-routing-key '~ts '. "
676700 " Previously dead-letter messages could not be forwarded on configured "
677701 " dead-letter-exchange ~ts and dead-letter-routing-key '~ts '. "
678702 " Forwarding of ~b pending dead-letter messages will be attempted." ,
679703 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ),
680704 RoutingKey , rabbit_misc :rs (OldDlx ), OldRoutingKey , maps :size (Pendings )]),
681- State # state {logged = maps :remove (no_route , Logged )};
705+ State = State0 # state {logged = maps :remove (no_route , Logged )},
706+ set_unroutable (State ),
707+ State ;
682708clear_log_no_route_once (State ) ->
683709 State .
684710
@@ -690,10 +716,21 @@ log_cycle_once(Queues, _, #state{logged = Logged} = State)
690716 State ;
691717log_cycle_once (Queues , RoutingKeys , # state {exchange_ref = DlxResource ,
692718 queue_ref = QueueResource ,
693- logged = Logged } = State ) ->
719+ logged = Logged } = State0 ) ->
694720 ? LOG_WARNING (" Dead-letter queues cycle detected for source ~ts "
695721 " with dead-letter exchange ~ts and routing keys ~tp : ~tp "
696722 " This message will not be logged again." ,
697723 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ),
698724 RoutingKeys , Queues ]),
699- State # state {logged = maps :put ({cycle , Queues }, true , Logged )}.
725+ State = State0 # state {logged = maps :put ({cycle , Queues }, true , Logged )},
726+ set_unroutable (State ),
727+ State .
728+
729+ set_unroutable (# state {queue_ref = QRef , logged = Logged }) ->
730+ Count = map_size (Logged ),
731+ try
732+ ets :update_element (? UNROUTABLE_TABLE , QRef , {2 , Count }, {QRef , Count })
733+ catch
734+ error :badarg ->
735+ ok
736+ end .
0 commit comments