@@ -782,25 +782,25 @@ apply_(#{system_time := Ts} = Meta,
782782 {State1 , Effects0 }
783783 end ,
784784
785- {State3 , Effects2 } = update_next_consumer_timeout (State2 , Effects1 ),
786785 % % activate SAC
787- {State , Effects } = activate_next_consumer ({State3 , Effects2 }),
786+ NextConsumerTimeout = next_consumer_timeout (State2 ),
787+ {State , Effects } =
788+ activate_next_consumer ({State2 #? STATE {next_consumer_timeout =
789+ NextConsumerTimeout },
790+ Effects1 }),
788791 checkout (Meta , State0 , State , Effects );
789792apply_ (_Meta , Cmd , State ) ->
790793 % % handle unhandled commands gracefully
791794 ? LOG_DEBUG (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
792795 {State , ok , []}.
793796
794- update_next_consumer_timeout (#? STATE {consumers = Cons } = State , Effects ) ->
795- Next = maps :fold (
796- fun (_ , # consumer {checked_out = Ch }, Acc ) ->
797- Min = maps :fold (fun (_ , ? C_MSG (T , _ ), A ) ->
798- min (T , A )
799- end , infinity , Ch ),
800- min (Min , Acc )
801- end , infinity , Cons ),
802- {State #? STATE {next_consumer_timeout = Next },
803- [{timer , evaluate_consumer_timeout , Next , {abs , true }} | Effects ]}.
797+ next_consumer_timeout (#? STATE {consumers = Cons }) ->
798+ maps :fold (fun (_ , # consumer {checked_out = Ch }, Acc ) ->
799+ Min = maps :fold (fun (_ , ? C_MSG (T , _ ), A ) ->
800+ min (T , A )
801+ end , infinity , Ch ),
802+ min (Min , Acc )
803+ end , infinity , Cons ).
804804
805805
806806-spec live_indexes (state ()) -> {ra_seq , ra_seq :state ()}.
@@ -1275,7 +1275,8 @@ which_module(8) -> ?MODULE.
12751275- record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
12761276- record (? AUX , {name :: atom (),
12771277 last_decorators_state :: term (),
1278- unused_1 :: term (),
1278+ last_consumer_timeout =
1279+ infinity :: infinity | non_neg_integer (),
12791280 gc = # aux_gc {} :: # aux_gc {},
12801281 tick_pid :: undefined | pid (),
12811282 cache = #{} :: map (),
@@ -1308,7 +1309,7 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13081309 when element (1 , AuxV3 ) == aux_v3 ->
13091310 AuxV4 = #? AUX {name = element (2 , AuxV3 ),
13101311 last_decorators_state = element (3 , AuxV3 ),
1311- unused_1 = undefined ,
1312+ last_consumer_timeout = infinity ,
13121313 gc = element (5 , AuxV3 ),
13131314 tick_pid = element (6 , AuxV3 ),
13141315 cache = element (7 , AuxV3 ),
@@ -1317,10 +1318,12 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13171318 handle_aux (RaftState , Tag , Cmd , AuxV4 , RaAux );
13181319handle_aux (leader , cast , eval ,
13191320 #? AUX {last_decorators_state = LastDec ,
1321+ last_consumer_timeout = LastConTimeout0 ,
13201322 last_checkpoint = Check0 } = Aux0 ,
13211323 RaAux ) ->
13221324
13231325 #? STATE {cfg = # cfg {resource = QName },
1326+ next_consumer_timeout = NextConTimeout ,
13241327 reclaimable_bytes = ReclaimableBytes } = MacState =
13251328 ra_aux :machine_state (RaAux ),
13261329
@@ -1337,16 +1340,29 @@ handle_aux(leader, cast, eval,
13371340
13381341 % % this is called after each batch of commands have been applied
13391342 % % set timer for message expire
1340- % % should really be the last applied index ts but this will have to do
13411343 Effects1 = timer_effect (MacState , Effects0 ),
1344+ % % if the timer has already elapsed (stale time from a prior leadership term)
1345+ % % then set infinity to ensure a timer is started
1346+ LastConTimeout = if LastConTimeout0 < Ts ->
1347+ infinity ;
1348+ true ->
1349+ LastConTimeout0
1350+ end ,
1351+
1352+ Effects2 = maybe_add_consumer_timeout_effect (NextConTimeout ,
1353+ LastConTimeout ,
1354+ Effects1 ),
13421355 case query_notify_decorators_info (MacState ) of
13431356 LastDec ->
1344- {no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects1 };
1357+ {no_reply , Aux0 #? AUX {last_checkpoint = Check ,
1358+ last_consumer_timeout = NextConTimeout }, RaAux , Effects2 };
13451359 {MaxActivePriority , IsEmpty } = NewLast ->
13461360 Effects = [notify_decorators_effect (QName , MaxActivePriority , IsEmpty )
1347- | Effects1 ],
1361+ | Effects2 ],
13481362 {no_reply , Aux0 #? AUX {last_checkpoint = Check ,
1349- last_decorators_state = NewLast }, RaAux , Effects }
1363+ last_consumer_timeout = NextConTimeout ,
1364+ last_decorators_state = NewLast }, RaAux ,
1365+ Effects }
13501366 end ;
13511367handle_aux (_RaftState , cast , eval ,
13521368 #? AUX {last_checkpoint = Check0 } = Aux0 , RaAux ) ->
@@ -2718,9 +2734,7 @@ assign_to_consumer(#{system_time := Ts} = Meta, _Ts, ConsumerKey, Msgs,
27182734 State = update_or_remove_con (Meta , ConsumerKey , Con , State1 ),
27192735 DelMsgs = lists :reverse (DeliveryMsgs ),
27202736 DeliveryEffect = delivery_effect (ConsumerKey , DelMsgs , State ),
2721- Effects = maybe_add_consumer_timeout_effect (NextConTimeout ,
2722- NextConTimeout0 ,
2723- [DeliveryEffect | Effects0 ]),
2737+ Effects = [DeliveryEffect | Effects0 ],
27242738 {State , Effects }.
27252739
27262740delayed_in (ReadyAt , Idx , Msg , DeferralToken , # delayed {tree = Tree0 ,
@@ -2819,45 +2833,45 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
28192833 % % there are consumers waiting to be serviced
28202834 % % process consumer checkout
28212835 case maps :get (ConsumerKey , Cons0 ) of
2822- # consumer {credit = Credit ,
2823- status = Status }
2824- when Credit =:= 0 orelse
2825- Status =/= up ->
2826- % % not an active consumer but still in the consumers
2827- % % map - this can happen when draining
2828- % % or when higher priority single active consumers
2829- % % take over, recurse without consumer in service
2830- % % queue
2831- checkout_one (Meta , ExpiredMsg ,
2832- InitState #? STATE {service_queue = SQ1 },
2833- Effects1 );
2834- # consumer {checked_out = Checked0 ,
2835- next_msg_id = Next ,
2836- credit = Credit ,
2837- delivery_count = DelCnt0 ,
2838- cfg = Cfg } = Con0 ->
2839- Timeout = Ts + Cfg # consumer_cfg .timeout ,
2840- Checked = maps :put (Next , ? C_MSG (Timeout , Msg ),
2841- Checked0 ),
2842- DelCnt = add (DelCnt0 , 1 ),
2843- Con = Con0 # consumer {checked_out = Checked ,
2844- next_msg_id = Next + 1 ,
2845- credit = Credit - 1 ,
2846- delivery_count = DelCnt },
2847- Size = get_header (size , get_msg_header (Msg )),
2848- State1 =
2849- State0 #? STATE {service_queue = SQ1 ,
2850- msg_bytes_checkout = BytesCheckout + Size ,
2851- msg_bytes_enqueue = BytesEnqueue - Size ,
2852- next_consumer_timeout = min ( Timeout , NextConTimeout )},
2853- Effects = maybe_add_consumer_timeout_effect ( Timeout ,
2854- NextConTimeout ,
2855- Effects1 ) ,
2856- State = update_or_remove_con (
2857- Meta , ConsumerKey , Con , State1 ),
2858- {success , ConsumerKey , Next , Msg , ExpiredMsg ,
2859- State , Effects }
2860- end ;
2836+ # consumer {credit = Credit ,
2837+ status = Status }
2838+ when Credit =:= 0 orelse
2839+ Status =/= up ->
2840+ % % not an active consumer but still in the consumers
2841+ % % map - this can happen when draining
2842+ % % or when higher priority single active consumers
2843+ % % take over, recurse without consumer in service
2844+ % % queue
2845+ checkout_one (Meta , ExpiredMsg ,
2846+ InitState #? STATE {service_queue = SQ1 },
2847+ Effects1 );
2848+ # consumer {checked_out = Checked0 ,
2849+ next_msg_id = Next ,
2850+ credit = Credit ,
2851+ delivery_count = DelCnt0 ,
2852+ cfg = Cfg } = Con0 ->
2853+ Timeout = Ts + Cfg # consumer_cfg .timeout ,
2854+ Checked = maps :put (Next , ? C_MSG (Timeout , Msg ),
2855+ Checked0 ),
2856+ DelCnt = add (DelCnt0 , 1 ),
2857+ Con = Con0 # consumer {checked_out = Checked ,
2858+ next_msg_id = Next + 1 ,
2859+ credit = Credit - 1 ,
2860+ delivery_count = DelCnt },
2861+ Size = get_header (size , get_msg_header (Msg )),
2862+ State1 =
2863+ State0 #? STATE {service_queue = SQ1 ,
2864+ msg_bytes_checkout =
2865+ BytesCheckout + Size ,
2866+ msg_bytes_enqueue =
2867+ BytesEnqueue - Size ,
2868+ next_consumer_timeout =
2869+ min ( Timeout , NextConTimeout )} ,
2870+ State = update_or_remove_con (Meta , ConsumerKey ,
2871+ Con , State1 ),
2872+ {success , ConsumerKey , Next , Msg , ExpiredMsg ,
2873+ State , Effects1 }
2874+ end ;
28612875 empty ->
28622876 {nochange , ExpiredMsg , InitState , Effects1 }
28632877 end ;
@@ -2988,7 +3002,8 @@ timer_effect(#?STATE{messages_total = 0,
29883002 delayed = # delayed {next = undefined }}, Effects ) ->
29893003 Effects ;
29903004timer_effect (#? STATE {messages_total = 0 ,
2991- delayed = # delayed {next = {NextDelayedTs , _ , _ }}}, Effects ) ->
3005+ delayed = # delayed {next = {NextDelayedTs , _ , _ }}},
3006+ Effects ) ->
29923007 [{timer , expire_msgs , NextDelayedTs , {abs , true }} | Effects ];
29933008timer_effect (#? STATE {returns = Returns ,
29943009 messages = Messages ,
@@ -3027,15 +3042,21 @@ timer_effect(#?STATE{returns = Returns,
30273042
30283043 % % Also consider the next delayed message timestamp
30293044 NextDelayedTs = case Delayed of
3030- # delayed {next = undefined } -> undefined ;
3031- # delayed {next = {Ts , _ , _ }} -> Ts
3045+ # delayed {next = undefined } ->
3046+ undefined ;
3047+ # delayed {next = {Ts , _ , _ }} ->
3048+ Ts
30323049 end ,
30333050
30343051 NextTimeout = case {NextExpiry , NextDelayedTs } of
3035- {undefined , undefined } -> undefined ;
3036- {undefined , D } -> D ;
3037- {E , undefined } -> E ;
3038- {E , D } -> min (E , D )
3052+ {undefined , undefined } ->
3053+ undefined ;
3054+ {undefined , D } ->
3055+ D ;
3056+ {E , undefined } ->
3057+ E ;
3058+ {E , D } ->
3059+ min (E , D )
30393060 end ,
30403061
30413062 case NextTimeout of
@@ -4096,7 +4117,8 @@ switch_from(_, _, State) ->
40964117switch_to (at_least_once , _ , Effects ) ->
40974118 % % Switch from some other strategy to at-least-once.
40984119 % % Dlx worker needs to be started on the leader.
4099- % % The cleanest way to determine the Ra state of this node is delegation to handle_aux.
4120+ % % The cleanest way to determine the Ra state of this node is
4121+ % % delegation to handle_aux.
41004122 {#? DLX {}, Effects ++ [{aux , {dlx , setup }}]};
41014123switch_to (_ , State , Effects ) ->
41024124 {State , Effects }.
0 commit comments