Skip to content

Commit 99f16a0

Browse files
committed
QQ: fix consumer timeout bug
where an infinity timer would be emitted for the same command but after an actual timer leader to no timer. Move to a model where consumer timers are handled by the aux handler after each eval batch to reduce the number of timers emitted.
1 parent 4e11945 commit 99f16a0

2 files changed

Lines changed: 205 additions & 141 deletions

File tree

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 92 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -782,25 +782,25 @@ apply_(#{system_time := Ts} = Meta,
782782
{State1, Effects0}
783783
end,
784784

785-
{State3, Effects2} = update_next_consumer_timeout(State2, Effects1),
786785
%% activate SAC
787-
{State, Effects} = activate_next_consumer({State3, Effects2}),
786+
NextConsumerTimeout = next_consumer_timeout(State2),
787+
{State, Effects} =
788+
activate_next_consumer({State2#?STATE{next_consumer_timeout =
789+
NextConsumerTimeout},
790+
Effects1}),
788791
checkout(Meta, State0, State, Effects);
789792
apply_(_Meta, Cmd, State) ->
790793
%% handle unhandled commands gracefully
791794
?LOG_DEBUG("rabbit_fifo: unhandled command ~W", [Cmd, 10]),
792795
{State, ok, []}.
793796

794-
update_next_consumer_timeout(#?STATE{consumers = Cons} = State, Effects) ->
795-
Next = maps:fold(
796-
fun (_, #consumer{checked_out = Ch}, Acc) ->
797-
Min = maps:fold(fun (_, ?C_MSG(T, _), A) ->
798-
min(T, A)
799-
end, infinity, Ch),
800-
min(Min, Acc)
801-
end, infinity, Cons),
802-
{State#?STATE{next_consumer_timeout = Next},
803-
[{timer, evaluate_consumer_timeout, Next, {abs, true}} | Effects]}.
797+
next_consumer_timeout(#?STATE{consumers = Cons}) ->
798+
maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) ->
799+
Min = maps:fold(fun (_, ?C_MSG(T, _), A) ->
800+
min(T, A)
801+
end, infinity, Ch),
802+
min(Min, Acc)
803+
end, infinity, Cons).
804804

805805

806806
-spec live_indexes(state()) -> {ra_seq, ra_seq:state()}.
@@ -1114,8 +1114,7 @@ state_enter(leader,
11141114
waiting_consumers = WaitingConsumers,
11151115
cfg = #cfg{resource = QRes,
11161116
dead_letter_handler = DLH},
1117-
dlx = DlxState} = State) ->
1118-
TimerEffs = timer_effect(State, []),
1117+
dlx = DlxState}) ->
11191118
% return effects to monitor all current consumers and enqueuers
11201119
Pids = lists:usort(maps:keys(Enqs)
11211120
++ [P || ?CONSUMER_PID(P) <- maps:values(Cons)]
@@ -1124,7 +1123,7 @@ state_enter(leader,
11241123
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
11251124
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
11261125
NotifyDecs = notify_decorators_startup(QRes),
1127-
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
1126+
Effects = Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
11281127

11291128
case DLH of
11301129
at_least_once ->
@@ -1275,7 +1274,8 @@ which_module(8) -> ?MODULE.
12751274
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
12761275
-record(?AUX, {name :: atom(),
12771276
last_decorators_state :: term(),
1278-
unused_1 :: term(),
1277+
last_consumer_timeout =
1278+
infinity :: infinity | non_neg_integer(),
12791279
gc = #aux_gc{} :: #aux_gc{},
12801280
tick_pid :: undefined | pid(),
12811281
cache = #{} :: map(),
@@ -1308,7 +1308,7 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13081308
when element(1, AuxV3) == aux_v3 ->
13091309
AuxV4 = #?AUX{name = element(2, AuxV3),
13101310
last_decorators_state = element(3, AuxV3),
1311-
unused_1 = undefined,
1311+
last_consumer_timeout = infinity,
13121312
gc = element(5, AuxV3),
13131313
tick_pid = element(6, AuxV3),
13141314
cache = element(7, AuxV3),
@@ -1317,10 +1317,12 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
13171317
handle_aux(RaftState, Tag, Cmd, AuxV4, RaAux);
13181318
handle_aux(leader, cast, eval,
13191319
#?AUX{last_decorators_state = LastDec,
1320+
last_consumer_timeout = LastConTimeout0,
13201321
last_checkpoint = Check0} = Aux0,
13211322
RaAux) ->
13221323

13231324
#?STATE{cfg = #cfg{resource = QName},
1325+
next_consumer_timeout = NextConTimeout,
13241326
reclaimable_bytes = ReclaimableBytes} = MacState =
13251327
ra_aux:machine_state(RaAux),
13261328

@@ -1337,16 +1339,29 @@ handle_aux(leader, cast, eval,
13371339

13381340
%% this is called after each batch of commands have been applied
13391341
%% set timer for message expire
1340-
%% should really be the last applied index ts but this will have to do
13411342
Effects1 = timer_effect(MacState, Effects0),
1343+
%% if the timer has already elapsed (stale time from a prior leadership term)
1344+
%% then set infinity to ensure a timer is started
1345+
LastConTimeout = if LastConTimeout0 < Ts ->
1346+
infinity;
1347+
true ->
1348+
LastConTimeout0
1349+
end,
1350+
1351+
Effects2 = maybe_add_consumer_timeout_effect(NextConTimeout,
1352+
LastConTimeout,
1353+
Effects1),
13421354
case query_notify_decorators_info(MacState) of
13431355
LastDec ->
1344-
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
1356+
{no_reply, Aux0#?AUX{last_checkpoint = Check,
1357+
last_consumer_timeout = NextConTimeout}, RaAux, Effects2};
13451358
{MaxActivePriority, IsEmpty} = NewLast ->
13461359
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
1347-
| Effects1],
1360+
| Effects2],
13481361
{no_reply, Aux0#?AUX{last_checkpoint = Check,
1349-
last_decorators_state = NewLast}, RaAux, Effects}
1362+
last_consumer_timeout = NextConTimeout,
1363+
last_decorators_state = NewLast}, RaAux,
1364+
Effects}
13501365
end;
13511366
handle_aux(_RaftState, cast, eval,
13521367
#?AUX{last_checkpoint = Check0} = Aux0, RaAux) ->
@@ -2718,9 +2733,7 @@ assign_to_consumer(#{system_time := Ts} = Meta, _Ts, ConsumerKey, Msgs,
27182733
State = update_or_remove_con(Meta, ConsumerKey, Con, State1),
27192734
DelMsgs = lists:reverse(DeliveryMsgs),
27202735
DeliveryEffect = delivery_effect(ConsumerKey, DelMsgs, State),
2721-
Effects = maybe_add_consumer_timeout_effect(NextConTimeout,
2722-
NextConTimeout0,
2723-
[DeliveryEffect | Effects0]),
2736+
Effects = [DeliveryEffect | Effects0],
27242737
{State, Effects}.
27252738

27262739
delayed_in(ReadyAt, Idx, Msg, DeferralToken, #delayed{tree = Tree0,
@@ -2819,45 +2832,45 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
28192832
%% there are consumers waiting to be serviced
28202833
%% process consumer checkout
28212834
case maps:get(ConsumerKey, Cons0) of
2822-
#consumer{credit = Credit,
2823-
status = Status}
2824-
when Credit =:= 0 orelse
2825-
Status =/= up ->
2826-
%% not an active consumer but still in the consumers
2827-
%% map - this can happen when draining
2828-
%% or when higher priority single active consumers
2829-
%% take over, recurse without consumer in service
2830-
%% queue
2831-
checkout_one(Meta, ExpiredMsg,
2832-
InitState#?STATE{service_queue = SQ1},
2833-
Effects1);
2834-
#consumer{checked_out = Checked0,
2835-
next_msg_id = Next,
2836-
credit = Credit,
2837-
delivery_count = DelCnt0,
2838-
cfg = Cfg} = Con0 ->
2839-
Timeout = Ts + Cfg#consumer_cfg.timeout,
2840-
Checked = maps:put(Next, ?C_MSG(Timeout, Msg),
2841-
Checked0),
2842-
DelCnt = add(DelCnt0, 1),
2843-
Con = Con0#consumer{checked_out = Checked,
2844-
next_msg_id = Next + 1,
2845-
credit = Credit - 1,
2846-
delivery_count = DelCnt},
2847-
Size = get_header(size, get_msg_header(Msg)),
2848-
State1 =
2849-
State0#?STATE{service_queue = SQ1,
2850-
msg_bytes_checkout = BytesCheckout + Size,
2851-
msg_bytes_enqueue = BytesEnqueue - Size,
2852-
next_consumer_timeout = min(Timeout, NextConTimeout)},
2853-
Effects = maybe_add_consumer_timeout_effect(Timeout,
2854-
NextConTimeout,
2855-
Effects1),
2856-
State = update_or_remove_con(
2857-
Meta, ConsumerKey, Con, State1),
2858-
{success, ConsumerKey, Next, Msg, ExpiredMsg,
2859-
State, Effects}
2860-
end;
2835+
#consumer{credit = Credit,
2836+
status = Status}
2837+
when Credit =:= 0 orelse
2838+
Status =/= up ->
2839+
%% not an active consumer but still in the consumers
2840+
%% map - this can happen when draining
2841+
%% or when higher priority single active consumers
2842+
%% take over, recurse without consumer in service
2843+
%% queue
2844+
checkout_one(Meta, ExpiredMsg,
2845+
InitState#?STATE{service_queue = SQ1},
2846+
Effects1);
2847+
#consumer{checked_out = Checked0,
2848+
next_msg_id = Next,
2849+
credit = Credit,
2850+
delivery_count = DelCnt0,
2851+
cfg = Cfg} = Con0 ->
2852+
Timeout = Ts + Cfg#consumer_cfg.timeout,
2853+
Checked = maps:put(Next, ?C_MSG(Timeout, Msg),
2854+
Checked0),
2855+
DelCnt = add(DelCnt0, 1),
2856+
Con = Con0#consumer{checked_out = Checked,
2857+
next_msg_id = Next + 1,
2858+
credit = Credit - 1,
2859+
delivery_count = DelCnt},
2860+
Size = get_header(size, get_msg_header(Msg)),
2861+
State1 =
2862+
State0#?STATE{service_queue = SQ1,
2863+
msg_bytes_checkout =
2864+
BytesCheckout + Size,
2865+
msg_bytes_enqueue =
2866+
BytesEnqueue - Size,
2867+
next_consumer_timeout =
2868+
min(Timeout, NextConTimeout)},
2869+
State = update_or_remove_con(Meta, ConsumerKey,
2870+
Con, State1),
2871+
{success, ConsumerKey, Next, Msg, ExpiredMsg,
2872+
State, Effects1}
2873+
end;
28612874
empty ->
28622875
{nochange, ExpiredMsg, InitState, Effects1}
28632876
end;
@@ -2988,7 +3001,8 @@ timer_effect(#?STATE{messages_total = 0,
29883001
delayed = #delayed{next = undefined}}, Effects) ->
29893002
Effects;
29903003
timer_effect(#?STATE{messages_total = 0,
2991-
delayed = #delayed{next = {NextDelayedTs, _, _}}}, Effects) ->
3004+
delayed = #delayed{next = {NextDelayedTs, _, _}}},
3005+
Effects) ->
29923006
[{timer, expire_msgs, NextDelayedTs, {abs, true}} | Effects];
29933007
timer_effect(#?STATE{returns = Returns,
29943008
messages = Messages,
@@ -3027,15 +3041,21 @@ timer_effect(#?STATE{returns = Returns,
30273041

30283042
%% Also consider the next delayed message timestamp
30293043
NextDelayedTs = case Delayed of
3030-
#delayed{next = undefined} -> undefined;
3031-
#delayed{next = {Ts, _, _}} -> Ts
3044+
#delayed{next = undefined} ->
3045+
undefined;
3046+
#delayed{next = {Ts, _, _}} ->
3047+
Ts
30323048
end,
30333049

30343050
NextTimeout = case {NextExpiry, NextDelayedTs} of
3035-
{undefined, undefined} -> undefined;
3036-
{undefined, D} -> D;
3037-
{E, undefined} -> E;
3038-
{E, D} -> min(E, D)
3051+
{undefined, undefined} ->
3052+
undefined;
3053+
{undefined, D} ->
3054+
D;
3055+
{E, undefined} ->
3056+
E;
3057+
{E, D} ->
3058+
min(E, D)
30393059
end,
30403060

30413061
case NextTimeout of
@@ -4096,7 +4116,8 @@ switch_from(_, _, State) ->
40964116
switch_to(at_least_once, _, Effects) ->
40974117
%% Switch from some other strategy to at-least-once.
40984118
%% Dlx worker needs to be started on the leader.
4099-
%% The cleanest way to determine the Ra state of this node is delegation to handle_aux.
4119+
%% The cleanest way to determine the Ra state of this node is
4120+
%% delegation to handle_aux.
41004121
{#?DLX{}, Effects ++ [{aux, {dlx, setup}}]};
41014122
switch_to(_, State, Effects) ->
41024123
{State, Effects}.

0 commit comments

Comments
 (0)