@@ -782,25 +782,25 @@ apply_(#{system_time := Ts} = Meta,
782782 {State1 , Effects0 }
783783 end ,
784784
785- {State3 , Effects2 } = update_next_consumer_timeout (State2 , Effects1 ),
786785 % % activate SAC
787- {State , Effects } = activate_next_consumer ({State3 , Effects2 }),
786+ NextConsumerTimeout = next_consumer_timeout (State2 ),
787+ {State , Effects } =
788+ activate_next_consumer ({State2 #? STATE {next_consumer_timeout =
789+ NextConsumerTimeout },
790+ Effects1 }),
788791 checkout (Meta , State0 , State , Effects );
789792apply_ (_Meta , Cmd , State ) ->
790793 % % handle unhandled commands gracefully
791794 ? LOG_DEBUG (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
792795 {State , ok , []}.
793796
794- update_next_consumer_timeout (#? STATE {consumers = Cons } = State , Effects ) ->
795- Next = maps :fold (
796- fun (_ , # consumer {checked_out = Ch }, Acc ) ->
797- Min = maps :fold (fun (_ , ? C_MSG (T , _ ), A ) ->
798- min (T , A )
799- end , infinity , Ch ),
800- min (Min , Acc )
801- end , infinity , Cons ),
802- {State #? STATE {next_consumer_timeout = Next },
803- [{timer , evaluate_consumer_timeout , Next , {abs , true }} | Effects ]}.
797+ next_consumer_timeout (#? STATE {consumers = Cons }) ->
798+ maps :fold (fun (_ , # consumer {checked_out = Ch }, Acc ) ->
799+ Min = maps :fold (fun (_ , ? C_MSG (T , _ ), A ) ->
800+ min (T , A )
801+ end , infinity , Ch ),
802+ min (Min , Acc )
803+ end , infinity , Cons ).
804804
805805
806806-spec live_indexes (state ()) -> {ra_seq , ra_seq :state ()}.
@@ -1114,8 +1114,7 @@ state_enter(leader,
11141114 waiting_consumers = WaitingConsumers ,
11151115 cfg = # cfg {resource = QRes ,
11161116 dead_letter_handler = DLH },
1117- dlx = DlxState } = State ) ->
1118- TimerEffs = timer_effect (State , []),
1117+ dlx = DlxState }) ->
11191118 % return effects to monitor all current consumers and enqueuers
11201119 Pids = lists :usort (maps :keys (Enqs )
11211120 ++ [P || ? CONSUMER_PID (P ) <- maps :values (Cons )]
@@ -1124,7 +1123,7 @@ state_enter(leader,
11241123 Nots = [{send_msg , P , leader_change , ra_event } || P <- Pids ],
11251124 NodeMons = lists :usort ([{monitor , node , node (P )} || P <- Pids ]),
11261125 NotifyDecs = notify_decorators_startup (QRes ),
1127- Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs ],
1126+ Effects = Mons ++ Nots ++ NodeMons ++ [NotifyDecs ],
11281127
11291128 case DLH of
11301129 at_least_once ->
@@ -1275,7 +1274,8 @@ which_module(8) -> ?MODULE.
12751274- record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
12761275- record (? AUX , {name :: atom (),
12771276 last_decorators_state :: term (),
1278- unused_1 :: term (),
1277+ last_consumer_timeout =
1278+ infinity :: infinity | non_neg_integer (),
12791279 gc = # aux_gc {} :: # aux_gc {},
12801280 tick_pid :: undefined | pid (),
12811281 cache = #{} :: map (),
@@ -1308,7 +1308,7 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13081308 when element (1 , AuxV3 ) == aux_v3 ->
13091309 AuxV4 = #? AUX {name = element (2 , AuxV3 ),
13101310 last_decorators_state = element (3 , AuxV3 ),
1311- unused_1 = undefined ,
1311+ last_consumer_timeout = infinity ,
13121312 gc = element (5 , AuxV3 ),
13131313 tick_pid = element (6 , AuxV3 ),
13141314 cache = element (7 , AuxV3 ),
@@ -1317,10 +1317,12 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13171317 handle_aux (RaftState , Tag , Cmd , AuxV4 , RaAux );
13181318handle_aux (leader , cast , eval ,
13191319 #? AUX {last_decorators_state = LastDec ,
1320+ last_consumer_timeout = LastConTimeout0 ,
13201321 last_checkpoint = Check0 } = Aux0 ,
13211322 RaAux ) ->
13221323
13231324 #? STATE {cfg = # cfg {resource = QName },
1325+ next_consumer_timeout = NextConTimeout ,
13241326 reclaimable_bytes = ReclaimableBytes } = MacState =
13251327 ra_aux :machine_state (RaAux ),
13261328
@@ -1337,16 +1339,29 @@ handle_aux(leader, cast, eval,
13371339
13381340 % % this is called after each batch of commands have been applied
13391341 % % set timer for message expire
1340- % % should really be the last applied index ts but this will have to do
13411342 Effects1 = timer_effect (MacState , Effects0 ),
1343+ % % if the timer has already elapsed (stale time from a prior leadership term)
1344+ % % then set infinity to ensure a timer is started
1345+ LastConTimeout = if LastConTimeout0 < Ts ->
1346+ infinity ;
1347+ true ->
1348+ LastConTimeout0
1349+ end ,
1350+
1351+ Effects2 = maybe_add_consumer_timeout_effect (NextConTimeout ,
1352+ LastConTimeout ,
1353+ Effects1 ),
13421354 case query_notify_decorators_info (MacState ) of
13431355 LastDec ->
1344- {no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects1 };
1356+ {no_reply , Aux0 #? AUX {last_checkpoint = Check ,
1357+ last_consumer_timeout = NextConTimeout }, RaAux , Effects2 };
13451358 {MaxActivePriority , IsEmpty } = NewLast ->
13461359 Effects = [notify_decorators_effect (QName , MaxActivePriority , IsEmpty )
1347- | Effects1 ],
1360+ | Effects2 ],
13481361 {no_reply , Aux0 #? AUX {last_checkpoint = Check ,
1349- last_decorators_state = NewLast }, RaAux , Effects }
1362+ last_consumer_timeout = NextConTimeout ,
1363+ last_decorators_state = NewLast }, RaAux ,
1364+ Effects }
13501365 end ;
13511366handle_aux (_RaftState , cast , eval ,
13521367 #? AUX {last_checkpoint = Check0 } = Aux0 , RaAux ) ->
@@ -2718,9 +2733,7 @@ assign_to_consumer(#{system_time := Ts} = Meta, _Ts, ConsumerKey, Msgs,
27182733 State = update_or_remove_con (Meta , ConsumerKey , Con , State1 ),
27192734 DelMsgs = lists :reverse (DeliveryMsgs ),
27202735 DeliveryEffect = delivery_effect (ConsumerKey , DelMsgs , State ),
2721- Effects = maybe_add_consumer_timeout_effect (NextConTimeout ,
2722- NextConTimeout0 ,
2723- [DeliveryEffect | Effects0 ]),
2736+ Effects = [DeliveryEffect | Effects0 ],
27242737 {State , Effects }.
27252738
27262739delayed_in (ReadyAt , Idx , Msg , DeferralToken , # delayed {tree = Tree0 ,
@@ -2819,45 +2832,45 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
28192832 % % there are consumers waiting to be serviced
28202833 % % process consumer checkout
28212834 case maps :get (ConsumerKey , Cons0 ) of
2822- # consumer {credit = Credit ,
2823- status = Status }
2824- when Credit =:= 0 orelse
2825- Status =/= up ->
2826- % % not an active consumer but still in the consumers
2827- % % map - this can happen when draining
2828- % % or when higher priority single active consumers
2829- % % take over, recurse without consumer in service
2830- % % queue
2831- checkout_one (Meta , ExpiredMsg ,
2832- InitState #? STATE {service_queue = SQ1 },
2833- Effects1 );
2834- # consumer {checked_out = Checked0 ,
2835- next_msg_id = Next ,
2836- credit = Credit ,
2837- delivery_count = DelCnt0 ,
2838- cfg = Cfg } = Con0 ->
2839- Timeout = Ts + Cfg # consumer_cfg .timeout ,
2840- Checked = maps :put (Next , ? C_MSG (Timeout , Msg ),
2841- Checked0 ),
2842- DelCnt = add (DelCnt0 , 1 ),
2843- Con = Con0 # consumer {checked_out = Checked ,
2844- next_msg_id = Next + 1 ,
2845- credit = Credit - 1 ,
2846- delivery_count = DelCnt },
2847- Size = get_header (size , get_msg_header (Msg )),
2848- State1 =
2849- State0 #? STATE {service_queue = SQ1 ,
2850- msg_bytes_checkout = BytesCheckout + Size ,
2851- msg_bytes_enqueue = BytesEnqueue - Size ,
2852- next_consumer_timeout = min ( Timeout , NextConTimeout )},
2853- Effects = maybe_add_consumer_timeout_effect ( Timeout ,
2854- NextConTimeout ,
2855- Effects1 ) ,
2856- State = update_or_remove_con (
2857- Meta , ConsumerKey , Con , State1 ),
2858- {success , ConsumerKey , Next , Msg , ExpiredMsg ,
2859- State , Effects }
2860- end ;
2835+ # consumer {credit = Credit ,
2836+ status = Status }
2837+ when Credit =:= 0 orelse
2838+ Status =/= up ->
2839+ % % not an active consumer but still in the consumers
2840+ % % map - this can happen when draining
2841+ % % or when higher priority single active consumers
2842+ % % take over, recurse without consumer in service
2843+ % % queue
2844+ checkout_one (Meta , ExpiredMsg ,
2845+ InitState #? STATE {service_queue = SQ1 },
2846+ Effects1 );
2847+ # consumer {checked_out = Checked0 ,
2848+ next_msg_id = Next ,
2849+ credit = Credit ,
2850+ delivery_count = DelCnt0 ,
2851+ cfg = Cfg } = Con0 ->
2852+ Timeout = Ts + Cfg # consumer_cfg .timeout ,
2853+ Checked = maps :put (Next , ? C_MSG (Timeout , Msg ),
2854+ Checked0 ),
2855+ DelCnt = add (DelCnt0 , 1 ),
2856+ Con = Con0 # consumer {checked_out = Checked ,
2857+ next_msg_id = Next + 1 ,
2858+ credit = Credit - 1 ,
2859+ delivery_count = DelCnt },
2860+ Size = get_header (size , get_msg_header (Msg )),
2861+ State1 =
2862+ State0 #? STATE {service_queue = SQ1 ,
2863+ msg_bytes_checkout =
2864+ BytesCheckout + Size ,
2865+ msg_bytes_enqueue =
2866+ BytesEnqueue - Size ,
2867+ next_consumer_timeout =
2868+ min ( Timeout , NextConTimeout )} ,
2869+ State = update_or_remove_con (Meta , ConsumerKey ,
2870+ Con , State1 ),
2871+ {success , ConsumerKey , Next , Msg , ExpiredMsg ,
2872+ State , Effects1 }
2873+ end ;
28612874 empty ->
28622875 {nochange , ExpiredMsg , InitState , Effects1 }
28632876 end ;
@@ -2988,7 +3001,8 @@ timer_effect(#?STATE{messages_total = 0,
29883001 delayed = # delayed {next = undefined }}, Effects ) ->
29893002 Effects ;
29903003timer_effect (#? STATE {messages_total = 0 ,
2991- delayed = # delayed {next = {NextDelayedTs , _ , _ }}}, Effects ) ->
3004+ delayed = # delayed {next = {NextDelayedTs , _ , _ }}},
3005+ Effects ) ->
29923006 [{timer , expire_msgs , NextDelayedTs , {abs , true }} | Effects ];
29933007timer_effect (#? STATE {returns = Returns ,
29943008 messages = Messages ,
@@ -3027,15 +3041,21 @@ timer_effect(#?STATE{returns = Returns,
30273041
30283042 % % Also consider the next delayed message timestamp
30293043 NextDelayedTs = case Delayed of
3030- # delayed {next = undefined } -> undefined ;
3031- # delayed {next = {Ts , _ , _ }} -> Ts
3044+ # delayed {next = undefined } ->
3045+ undefined ;
3046+ # delayed {next = {Ts , _ , _ }} ->
3047+ Ts
30323048 end ,
30333049
30343050 NextTimeout = case {NextExpiry , NextDelayedTs } of
3035- {undefined , undefined } -> undefined ;
3036- {undefined , D } -> D ;
3037- {E , undefined } -> E ;
3038- {E , D } -> min (E , D )
3051+ {undefined , undefined } ->
3052+ undefined ;
3053+ {undefined , D } ->
3054+ D ;
3055+ {E , undefined } ->
3056+ E ;
3057+ {E , D } ->
3058+ min (E , D )
30393059 end ,
30403060
30413061 case NextTimeout of
@@ -4096,7 +4116,8 @@ switch_from(_, _, State) ->
40964116switch_to (at_least_once , _ , Effects ) ->
40974117 % % Switch from some other strategy to at-least-once.
40984118 % % Dlx worker needs to be started on the leader.
4099- % % The cleanest way to determine the Ra state of this node is delegation to handle_aux.
4119+ % % The cleanest way to determine the Ra state of this node is
4120+ % % delegation to handle_aux.
41004121 {#? DLX {}, Effects ++ [{aux , {dlx , setup }}]};
41014122switch_to (_ , State , Effects ) ->
41024123 {State , Effects }.
0 commit comments