@@ -251,12 +251,10 @@ update_config(Conf, State) ->
251251apply (Meta , {machine_version , FromVersion , ToVersion }, VXState ) ->
252252 % % machine version upgrades cant be done in apply_
253253 State = convert (Meta , FromVersion , ToVersion , VXState ),
254- % % TODO: force snapshot now?
255254 {State , ok , [{aux , {dlx , setup }}]};
256255apply (#{system_time := Ts } = Meta , Cmd ,
257256 #? STATE {discarded_bytes = DiscBytes } = State ) ->
258257 % % add estimated discared_bytes
259- % % TODO: optimise!
260258 % % this is the simplest way to record the discarded bytes for most
261259 % % commands but it is a bit mory garby as almost always creates a new
262260 % % state copy before even processing the command
@@ -906,7 +904,7 @@ v7_to_v8_consumer(Con, Timeout) ->
906904 credit_mode = element (# consumer_cfg .credit_mode , V7Cfg ),
907905 lifetime = element (# consumer_cfg .lifetime , V7Cfg ),
908906 priority = element (# consumer_cfg .priority , V7Cfg ),
909- timeout = 1_800_000
907+ timeout = ? DEFAULT_CONSUMER_TIMEOUT_MS
910908 },
911909 Status = case Status0 of
912910 suspected_down ->
@@ -926,9 +924,7 @@ convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) ->
926924 % % the structure is intact for now
927925 Cons0 = element (#? STATE .consumers , StateV7 ),
928926 Waiting0 = element (#? STATE .waiting_consumers , StateV7 ),
929- % % TODO: use default for now
930- % % TODO: review consumer timeout conversion
931- Timeout = Ts + 1_800_000 ,
927+ Timeout = Ts + ? DEFAULT_CONSUMER_TIMEOUT_MS ,
932928 Cons = maps :map (
933929 fun (_CKey , Con ) ->
934930 v7_to_v8_consumer (Con , Timeout )
@@ -1126,11 +1122,9 @@ overview(#?STATE{consumers = Cons,
11261122 #{}
11271123 end ,
11281124 MsgsRet = lqueue :len (Returns ),
1129- % % TODO emit suitable overview metrics
1130- #{
1131- num_active_priorities := NumActivePriorities ,
1132- detail := Detail
1133- } = rabbit_fifo_pq :overview (Messages ),
1125+
1126+ #{num_active_priorities := NumActivePriorities ,
1127+ detail := Detail } = rabbit_fifo_pq :overview (Messages ),
11341128
11351129 Overview = #{type => ? STATE ,
11361130 config => Conf ,
@@ -3359,51 +3353,60 @@ do_snapshot(MacVer, Ts, Ch, RaAux, DiscardedBytes, Force)
33593353 LastTs = element (3 , Ch ),
33603354 do_snapshot (MacVer , Ts , # snapshot {index = Idx , timestamp = LastTs },
33613355 RaAux , DiscardedBytes , Force );
3362- do_snapshot (MacVer , Ts , # snapshot { index = _ChIdx ,
3363- timestamp = SnapTime ,
3364- discarded_bytes = LastDiscardedBytes } = Snap0 ,
3356+ do_snapshot (MacVer , Ts ,
3357+ # snapshot { timestamp = SnapTime ,
3358+ discarded_bytes = LastDiscardedBytes } = Snap0 ,
33653359 RaAux , DiscardedBytes , Force )
33663360 when is_integer (MacVer ) andalso MacVer >= 8 ->
3367- LastAppliedIdx = ra_aux :last_applied (RaAux ),
3368- #? STATE {consumers = Consumers ,
3369- enqueuers = Enqueuers } = MacState = ra_aux :machine_state (RaAux ),
3370- TimeSince = Ts - SnapTime ,
3371- MsgsTot = messages_total (MacState ),
3372- % % if the approximate snapshot size * 2 can be reclaimed it is worth
3373- % % taking a snapshot
3374- % % take number of enqueues and consumers into account
3375- % % message: 32 bytes
3376- % % enqueuer: 96 bytes
3377- % % consumer: 256 bytes
3378- % % TODO: refine this
3379- NumEnqueuers = map_size (Enqueuers ),
3380- NumConsumers = map_size (Consumers ),
3381- ApproxSnapSize = 4096 +
3382- (MsgsTot * 32 ) +
3383- (NumEnqueuers * 96 ) +
3384- (NumConsumers * 256 ),
3385- Limit = (ApproxSnapSize * 5 ),
3386-
3387- EnoughDataRemoved = DiscardedBytes - LastDiscardedBytes > Limit ,
3388-
3389- {CheckMinInterval , _CheckMinIndexes , _CheckMaxIndexes } =
3361+ {CheckMinInterval , _ , _ } =
33903362 persistent_term :get (quorum_queue_checkpoint_config ,
3391- {? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
3363+ {? CHECK_MIN_INTERVAL_MS ,
3364+ ? CHECK_MIN_INDEXES ,
33923365 ? CHECK_MAX_INDEXES }),
3393- EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
3394- case (EnoughTimeHasPassed andalso
3395- EnoughDataRemoved ) orelse
3396- Force of
3366+ TimeSince = Ts - SnapTime ,
3367+ case TimeSince > CheckMinInterval orelse Force of
33973368 true ->
3398- % % TODO: if efficient we can set the index of the release cursor
3399- % % condition to the highest live index instead of the snapshot index
3400- {# snapshot {index = LastAppliedIdx ,
3401- timestamp = Ts ,
3402- messages_total = MsgsTot ,
3403- discarded_bytes = DiscardedBytes },
3404- [{release_cursor , LastAppliedIdx , MacState ,
3405- #{condition => [{written , LastAppliedIdx },
3406- no_snapshot_sends ]}}]};
3369+ #? STATE {consumers = Consumers ,
3370+ enqueuers = Enqueuers ,
3371+ waiting_consumers = Waiting } = MacState =
3372+ ra_aux :machine_state (RaAux ),
3373+ MsgsTot = messages_total (MacState ),
3374+ % % If the approximate snapshot size * 5 can be reclaimed
3375+ % % it is worth taking a snapshot.
3376+ % % Estimates per component:
3377+ % % message: 32 bytes
3378+ % % checked-out message: 72 bytes
3379+ % % enqueuer: 112 bytes
3380+ % % consumer: 296 bytes
3381+ % % waiting consumer: 312 bytes
3382+ NumEnqs = map_size (Enqueuers ),
3383+ NumCons = map_size (Consumers ),
3384+ NumWaiting = length (Waiting ),
3385+ CheckedOut = maps :fold (
3386+ fun (_ , # consumer {checked_out = C }, Acc ) ->
3387+ Acc + map_size (C )
3388+ end , 0 , Consumers ),
3389+ ApproxSnapSize = 4096 +
3390+ MsgsTot * 32 +
3391+ CheckedOut * 72 +
3392+ NumEnqs * 112 +
3393+ NumCons * 296 +
3394+ NumWaiting * 312 ,
3395+ Limit = ApproxSnapSize * 5 ,
3396+ EnoughDataRemoved = DiscardedBytes - LastDiscardedBytes ,
3397+ case EnoughDataRemoved > Limit orelse Force of
3398+ true ->
3399+ Idx = ra_aux :last_applied (RaAux ),
3400+ Snap = # snapshot {index = Idx ,
3401+ timestamp = Ts ,
3402+ messages_total = MsgsTot ,
3403+ discarded_bytes = DiscardedBytes },
3404+ Cond = #{condition => [{written , Idx },
3405+ no_snapshot_sends ]},
3406+ {Snap , [{release_cursor , Idx , MacState , Cond }]};
3407+ false ->
3408+ {Snap0 , []}
3409+ end ;
34073410 false ->
34083411 {Snap0 , []}
34093412 end .
0 commit comments