@@ -782,25 +782,25 @@ apply_(#{system_time := Ts} = Meta,
782782 {State1 , Effects0 }
783783 end ,
784784
785- {State3 , Effects2 } = update_next_consumer_timeout (State2 , Effects1 ),
786785 % % activate SAC
787- {State , Effects } = activate_next_consumer ({State3 , Effects2 }),
786+ NextConsumerTimeout = next_consumer_timeout (State2 ),
787+ {State , Effects } =
788+ activate_next_consumer ({State2 #? STATE {next_consumer_timeout =
789+ NextConsumerTimeout },
790+ Effects1 }),
788791 checkout (Meta , State0 , State , Effects );
789792apply_ (_Meta , Cmd , State ) ->
790793 % % handle unhandled commands gracefully
791794 ? LOG_DEBUG (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
792795 {State , ok , []}.
793796
794- update_next_consumer_timeout (#? STATE {consumers = Cons } = State , Effects ) ->
795- Next = maps :fold (
796- fun (_ , # consumer {checked_out = Ch }, Acc ) ->
797- Min = maps :fold (fun (_ , ? C_MSG (T , _ ), A ) ->
798- min (T , A )
799- end , infinity , Ch ),
800- min (Min , Acc )
801- end , infinity , Cons ),
802- {State #? STATE {next_consumer_timeout = Next },
803- [{timer , evaluate_consumer_timeout , Next , {abs , true }} | Effects ]}.
797+ next_consumer_timeout (#? STATE {consumers = Cons }) ->
798+ maps :fold (fun (_ , # consumer {checked_out = Ch }, Acc ) ->
799+ Min = maps :fold (fun (_ , ? C_MSG (T , _ ), A ) ->
800+ min (T , A )
801+ end , infinity , Ch ),
802+ min (Min , Acc )
803+ end , infinity , Cons ).
804804
805805
806806-spec live_indexes (state ()) -> {ra_seq , ra_seq :state ()}.
@@ -1114,8 +1114,8 @@ state_enter(leader,
11141114 waiting_consumers = WaitingConsumers ,
11151115 cfg = # cfg {resource = QRes ,
11161116 dead_letter_handler = DLH },
1117- dlx = DlxState } = State ) ->
1118- TimerEffs = timer_effect (State , []),
1117+ dlx = DlxState }) ->
1118+ % TimerEffs = timer_effect(State, []),
11191119 % return effects to monitor all current consumers and enqueuers
11201120 Pids = lists :usort (maps :keys (Enqs )
11211121 ++ [P || ? CONSUMER_PID (P ) <- maps :values (Cons )]
@@ -1124,7 +1124,8 @@ state_enter(leader,
11241124 Nots = [{send_msg , P , leader_change , ra_event } || P <- Pids ],
11251125 NodeMons = lists :usort ([{monitor , node , node (P )} || P <- Pids ]),
11261126 NotifyDecs = notify_decorators_startup (QRes ),
1127- Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs ],
1127+ % Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
1128+ Effects = Mons ++ Nots ++ NodeMons ++ [NotifyDecs ],
11281129
11291130 case DLH of
11301131 at_least_once ->
@@ -1275,7 +1276,8 @@ which_module(8) -> ?MODULE.
12751276- record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
12761277- record (? AUX , {name :: atom (),
12771278 last_decorators_state :: term (),
1278- unused_1 :: term (),
1279+ last_consumer_timeout =
1280+ infinity :: infinity | non_neg_integer (),
12791281 gc = # aux_gc {} :: # aux_gc {},
12801282 tick_pid :: undefined | pid (),
12811283 cache = #{} :: map (),
@@ -1308,7 +1310,7 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13081310 when element (1 , AuxV3 ) == aux_v3 ->
13091311 AuxV4 = #? AUX {name = element (2 , AuxV3 ),
13101312 last_decorators_state = element (3 , AuxV3 ),
1311- unused_1 = undefined ,
1313+ last_consumer_timeout = infinity ,
13121314 gc = element (5 , AuxV3 ),
13131315 tick_pid = element (6 , AuxV3 ),
13141316 cache = element (7 , AuxV3 ),
@@ -1317,10 +1319,12 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13171319 handle_aux (RaftState , Tag , Cmd , AuxV4 , RaAux );
13181320handle_aux (leader , cast , eval ,
13191321 #? AUX {last_decorators_state = LastDec ,
1322+ last_consumer_timeout = LastConTimeout0 ,
13201323 last_checkpoint = Check0 } = Aux0 ,
13211324 RaAux ) ->
13221325
13231326 #? STATE {cfg = # cfg {resource = QName },
1327+ next_consumer_timeout = NextConTimeout ,
13241328 reclaimable_bytes = ReclaimableBytes } = MacState =
13251329 ra_aux :machine_state (RaAux ),
13261330
@@ -1337,16 +1341,29 @@ handle_aux(leader, cast, eval,
13371341
13381342 % % this is called after each batch of commands have been applied
13391343 % % set timer for message expire
1340- % % should really be the last applied index ts but this will have to do
13411344 Effects1 = timer_effect (MacState , Effects0 ),
1345+ % % if the timer has already elapsed (stale time from a prior leadership term)
1346+ % % then set infinity to ensure a timer is started
1347+ LastConTimeout = if LastConTimeout0 < Ts ->
1348+ infinity ;
1349+ true ->
1350+ LastConTimeout0
1351+ end ,
1352+
1353+ Effects2 = maybe_add_consumer_timeout_effect (NextConTimeout ,
1354+ LastConTimeout ,
1355+ Effects1 ),
13421356 case query_notify_decorators_info (MacState ) of
13431357 LastDec ->
1344- {no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects1 };
1358+ {no_reply , Aux0 #? AUX {last_checkpoint = Check ,
1359+ last_consumer_timeout = NextConTimeout }, RaAux , Effects2 };
13451360 {MaxActivePriority , IsEmpty } = NewLast ->
13461361 Effects = [notify_decorators_effect (QName , MaxActivePriority , IsEmpty )
1347- | Effects1 ],
1362+ | Effects2 ],
13481363 {no_reply , Aux0 #? AUX {last_checkpoint = Check ,
1349- last_decorators_state = NewLast }, RaAux , Effects }
1364+ last_consumer_timeout = NextConTimeout ,
1365+ last_decorators_state = NewLast }, RaAux ,
1366+ Effects }
13501367 end ;
13511368handle_aux (_RaftState , cast , eval ,
13521369 #? AUX {last_checkpoint = Check0 } = Aux0 , RaAux ) ->
@@ -2718,9 +2735,7 @@ assign_to_consumer(#{system_time := Ts} = Meta, _Ts, ConsumerKey, Msgs,
27182735 State = update_or_remove_con (Meta , ConsumerKey , Con , State1 ),
27192736 DelMsgs = lists :reverse (DeliveryMsgs ),
27202737 DeliveryEffect = delivery_effect (ConsumerKey , DelMsgs , State ),
2721- Effects = maybe_add_consumer_timeout_effect (NextConTimeout ,
2722- NextConTimeout0 ,
2723- [DeliveryEffect | Effects0 ]),
2738+ Effects = [DeliveryEffect | Effects0 ],
27242739 {State , Effects }.
27252740
27262741delayed_in (ReadyAt , Idx , Msg , DeferralToken , # delayed {tree = Tree0 ,
@@ -2819,45 +2834,45 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
28192834 % % there are consumers waiting to be serviced
28202835 % % process consumer checkout
28212836 case maps :get (ConsumerKey , Cons0 ) of
2822- # consumer {credit = Credit ,
2823- status = Status }
2824- when Credit =:= 0 orelse
2825- Status =/= up ->
2826- % % not an active consumer but still in the consumers
2827- % % map - this can happen when draining
2828- % % or when higher priority single active consumers
2829- % % take over, recurse without consumer in service
2830- % % queue
2831- checkout_one (Meta , ExpiredMsg ,
2832- InitState #? STATE {service_queue = SQ1 },
2833- Effects1 );
2834- # consumer {checked_out = Checked0 ,
2835- next_msg_id = Next ,
2836- credit = Credit ,
2837- delivery_count = DelCnt0 ,
2838- cfg = Cfg } = Con0 ->
2839- Timeout = Ts + Cfg # consumer_cfg .timeout ,
2840- Checked = maps :put (Next , ? C_MSG (Timeout , Msg ),
2841- Checked0 ),
2842- DelCnt = add (DelCnt0 , 1 ),
2843- Con = Con0 # consumer {checked_out = Checked ,
2844- next_msg_id = Next + 1 ,
2845- credit = Credit - 1 ,
2846- delivery_count = DelCnt },
2847- Size = get_header (size , get_msg_header (Msg )),
2848- State1 =
2849- State0 #? STATE {service_queue = SQ1 ,
2850- msg_bytes_checkout = BytesCheckout + Size ,
2851- msg_bytes_enqueue = BytesEnqueue - Size ,
2852- next_consumer_timeout = min ( Timeout , NextConTimeout )},
2853- Effects = maybe_add_consumer_timeout_effect ( Timeout ,
2854- NextConTimeout ,
2855- Effects1 ) ,
2856- State = update_or_remove_con (
2857- Meta , ConsumerKey , Con , State1 ),
2858- {success , ConsumerKey , Next , Msg , ExpiredMsg ,
2859- State , Effects }
2860- end ;
2837+ # consumer {credit = Credit ,
2838+ status = Status }
2839+ when Credit =:= 0 orelse
2840+ Status =/= up ->
2841+ % % not an active consumer but still in the consumers
2842+ % % map - this can happen when draining
2843+ % % or when higher priority single active consumers
2844+ % % take over, recurse without consumer in service
2845+ % % queue
2846+ checkout_one (Meta , ExpiredMsg ,
2847+ InitState #? STATE {service_queue = SQ1 },
2848+ Effects1 );
2849+ # consumer {checked_out = Checked0 ,
2850+ next_msg_id = Next ,
2851+ credit = Credit ,
2852+ delivery_count = DelCnt0 ,
2853+ cfg = Cfg } = Con0 ->
2854+ Timeout = Ts + Cfg # consumer_cfg .timeout ,
2855+ Checked = maps :put (Next , ? C_MSG (Timeout , Msg ),
2856+ Checked0 ),
2857+ DelCnt = add (DelCnt0 , 1 ),
2858+ Con = Con0 # consumer {checked_out = Checked ,
2859+ next_msg_id = Next + 1 ,
2860+ credit = Credit - 1 ,
2861+ delivery_count = DelCnt },
2862+ Size = get_header (size , get_msg_header (Msg )),
2863+ State1 =
2864+ State0 #? STATE {service_queue = SQ1 ,
2865+ msg_bytes_checkout =
2866+ BytesCheckout + Size ,
2867+ msg_bytes_enqueue =
2868+ BytesEnqueue - Size ,
2869+ next_consumer_timeout =
2870+ min ( Timeout , NextConTimeout )} ,
2871+ State = update_or_remove_con (Meta , ConsumerKey ,
2872+ Con , State1 ),
2873+ {success , ConsumerKey , Next , Msg , ExpiredMsg ,
2874+ State , Effects1 }
2875+ end ;
28612876 empty ->
28622877 {nochange , ExpiredMsg , InitState , Effects1 }
28632878 end ;
@@ -2988,7 +3003,8 @@ timer_effect(#?STATE{messages_total = 0,
29883003 delayed = # delayed {next = undefined }}, Effects ) ->
29893004 Effects ;
29903005timer_effect (#? STATE {messages_total = 0 ,
2991- delayed = # delayed {next = {NextDelayedTs , _ , _ }}}, Effects ) ->
3006+ delayed = # delayed {next = {NextDelayedTs , _ , _ }}},
3007+ Effects ) ->
29923008 [{timer , expire_msgs , NextDelayedTs , {abs , true }} | Effects ];
29933009timer_effect (#? STATE {returns = Returns ,
29943010 messages = Messages ,
@@ -3027,15 +3043,21 @@ timer_effect(#?STATE{returns = Returns,
30273043
30283044 % % Also consider the next delayed message timestamp
30293045 NextDelayedTs = case Delayed of
3030- # delayed {next = undefined } -> undefined ;
3031- # delayed {next = {Ts , _ , _ }} -> Ts
3046+ # delayed {next = undefined } ->
3047+ undefined ;
3048+ # delayed {next = {Ts , _ , _ }} ->
3049+ Ts
30323050 end ,
30333051
30343052 NextTimeout = case {NextExpiry , NextDelayedTs } of
3035- {undefined , undefined } -> undefined ;
3036- {undefined , D } -> D ;
3037- {E , undefined } -> E ;
3038- {E , D } -> min (E , D )
3053+ {undefined , undefined } ->
3054+ undefined ;
3055+ {undefined , D } ->
3056+ D ;
3057+ {E , undefined } ->
3058+ E ;
3059+ {E , D } ->
3060+ min (E , D )
30393061 end ,
30403062
30413063 case NextTimeout of
@@ -4096,7 +4118,8 @@ switch_from(_, _, State) ->
40964118switch_to (at_least_once , _ , Effects ) ->
40974119 % % Switch from some other strategy to at-least-once.
40984120 % % Dlx worker needs to be started on the leader.
4099- % % The cleanest way to determine the Ra state of this node is delegation to handle_aux.
4121+ % % The cleanest way to determine the Ra state of this node is
4122+ % % delegation to handle_aux.
41004123 {#? DLX {}, Effects ++ [{aux , {dlx , setup }}]};
41014124switch_to (_ , State , Effects ) ->
41024125 {State , Effects }.
0 commit comments