Skip to content

Commit d276cd8

Browse files
committed
client: capture demand RU on onResponseWait throttle-fail
The new `demand_ru_per_sec` gauge promised "every entry point, never subtracted on throttle failure", but `onResponseWaitImpl` increments `mu.demandRUTotal` only after `acquireTokens` succeeds, so a throttle rejection silently drops the demand sample -- exactly the case the metric is meant to surface. Root cause: the increment was co-located with the consumption update inside the post-acquire lock block, even though demand and consumption have different lifetimes (demand is monotonic; consumption is rolled back on rejection). Four call sites carried the same inline expression, making the wrong placement easy to add and hard to notice. This commit makes the invariant structural: * Add `(*groupCostController).recordDemand`, the single point where `mu.demandRUTotal` grows. Its doc comment states the rule: callers MUST invoke it before any limiter wait/acquire so demand survives a rejection. * Route `onRequestWaitImpl`, `onResponseImpl`, `onResponseWaitImpl`, and `addRUConsumption` through `recordDemand`. In `onResponseWaitImpl` this also hoists the call above `acquireTokens`, fixing the bug. * Add `TestDemandRUCapturedOnResponseWaitThrottle` to lock the invariant in via the throttle-fail path. * Rewrite the EMA portion of `TestDemandRUTracking`: the previous version assigned `gc.run.now` only to have `updateRunState` immediately overwrite it with `time.Now()`, so the two-tick EMA assertion was a no-op. The new version drives `calcDemandAvg` directly with hand-set timestamps and asserts the actual EMA trajectory. * Mirror the `acceleratedReportingPeriod` failpoint into `calcDemandAvg` so any test that accelerates `calcAvg` accelerates the demand EMA in lockstep. * `calcDemandAvg` now returns whether it actually updated; the gauge Set is gated on that so we never re-publish a stale value when no time has elapsed. Drop the `< 0` clamp -- the input counter is monotonically increasing, so the EMA cannot go negative. * Extend the leak-TODO in `cleanUpResourceGroup` to include `LowTokenRequestNotifyCounter`, which has the same per-group label cardinality as the others on the list. Signed-off-by: JmPotato <github@ipotato.me>
1 parent 4d6937d commit d276cd8

File tree

3 files changed

+131
-26
lines changed

3 files changed

+131
-26
lines changed

client/resource_group/controller/global_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,8 +637,9 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
637637
metrics.DemandRUPerSecGauge.DeleteLabelValues(resourceGroupName)
638638
// TODO: clean up the remaining per-group metrics (e.g. TokenConsumedHistogram,
639639
// GroupRunningKVRequestCounter, SuccessfulRequestDuration, FailedRequestCounter,
640-
// ResourceGroupTokenRequestCounter, RequestRetryCounter, FailedLimitReserveDuration)
641-
// which currently leak label series on resource group deletion.
640+
// ResourceGroupTokenRequestCounter, RequestRetryCounter, FailedLimitReserveDuration,
641+
// LowTokenRequestNotifyCounter) which currently leak label series on resource
642+
// group deletion.
642643
return true
643644
}
644645
gc.inactive = true

client/resource_group/controller/group_controller.go

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,23 @@ func (gc *groupCostController) updateRunState() {
278278
gc.run.now = newTime
279279
}
280280

281+
// recordDemand accumulates a delta into the pre-throttling demand counter.
282+
//
283+
// Call sites MUST invoke this before any token-bucket wait/acquire so that
284+
// demand is captured even when the request is ultimately rejected by the
285+
// limiter; that is the entire reason `demandRUTotal` is tracked separately
286+
// from `consumption`. Demand is monotonically increasing and is never rolled
287+
// back on throttle failure.
288+
func (gc *groupCostController) recordDemand(delta *rmpb.Consumption) {
289+
v := getRUValueFromConsumption(delta)
290+
if v == 0 {
291+
return
292+
}
293+
gc.mu.Lock()
294+
gc.mu.demandRUTotal += v
295+
gc.mu.Unlock()
296+
}
297+
281298
func (gc *groupCostController) updateAvgRequestResourcePerSec() {
282299
isBurstable := true
283300
counter := gc.run.requestUnitTokens
@@ -287,8 +304,9 @@ func (gc *groupCostController) updateAvgRequestResourcePerSec() {
287304
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption)) {
288305
return
289306
}
290-
gc.calcDemandAvg(counter, gc.run.demandRUTotal)
291-
gc.metrics.demandRUPerSecGauge.Set(counter.avgDemandRUPerSec)
307+
if gc.calcDemandAvg(counter, gc.run.demandRUTotal) {
308+
gc.metrics.demandRUPerSecGauge.Set(counter.avgDemandRUPerSec)
309+
}
292310
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Float64("avg-demand-ru-per-sec", counter.avgDemandRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
293311
gc.burstable.Store(isBurstable)
294312
}
@@ -337,18 +355,26 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool
337355
return true
338356
}
339357

340-
func (gc *groupCostController) calcDemandAvg(counter *tokenCounter, new float64) {
358+
// calcDemandAvg recomputes the EMA of pre-throttling demanded RU/s.
359+
//
360+
// Returns false (and leaves state untouched) when no time has elapsed since
361+
// the last update so the gauge is not re-Set with a stale reading. Unlike
362+
// `calcAvg`, no negative-clamp is needed because `demandRUTotal` is
363+
// monotonically increasing; the EMA of a non-negative-delta sequence cannot
364+
// itself become negative.
365+
func (gc *groupCostController) calcDemandAvg(counter *tokenCounter, new float64) bool {
341366
deltaDuration := gc.run.now.Sub(counter.avgDemandLastTime)
367+
failpoint.Inject("acceleratedReportingPeriod", func() {
368+
deltaDuration = 100 * time.Millisecond
369+
})
342370
if deltaDuration <= 0 {
343-
return
371+
return false
344372
}
345373
delta := (new - counter.avgDemandRUPerSecLastRU) / deltaDuration.Seconds()
346374
counter.avgDemandRUPerSec = movingAvgFactor*counter.avgDemandRUPerSec + (1-movingAvgFactor)*delta
347-
if counter.avgDemandRUPerSec < 0 {
348-
counter.avgDemandRUPerSec = 0
349-
}
350375
counter.avgDemandLastTime = gc.run.now
351376
counter.avgDemandRUPerSecLastRU = new
377+
return true
352378
}
353379

354380
func (gc *groupCostController) shouldReportConsumption() bool {
@@ -584,9 +610,12 @@ func (gc *groupCostController) onRequestWaitImpl(
584610
calc.BeforeKVRequest(delta, info)
585611
}
586612

613+
// Record pre-throttling demand before any limiter interaction so a
614+
// subsequent rollback only unwinds consumption, not demand.
615+
gc.recordDemand(delta)
616+
587617
gc.mu.Lock()
588618
add(gc.mu.consumption, delta)
589-
gc.mu.demandRUTotal += getRUValueFromConsumption(delta)
590619
gc.mu.Unlock()
591620

592621
if !gc.burstable.Load() {
@@ -634,6 +663,13 @@ func (gc *groupCostController) onResponseImpl(
634663
for _, calc := range gc.calculators {
635664
calc.AfterKVRequest(delta, req, resp)
636665
}
666+
667+
// Record pre-throttling demand. `onResponseImpl` does not block on token
668+
// acquisition, so this could equivalently sit inside the lock block below;
669+
// keeping it here makes the demand-before-limiter invariant uniform across
670+
// all entry points.
671+
gc.recordDemand(delta)
672+
637673
if !gc.burstable.Load() {
638674
counter := gc.run.requestUnitTokens
639675
if v := getRUValueFromConsumption(delta); v > 0 {
@@ -644,8 +680,6 @@ func (gc *groupCostController) onResponseImpl(
644680
gc.mu.Lock()
645681
// Record the consumption of the request
646682
add(gc.mu.consumption, delta)
647-
// Record the response-phase demand as well (actual read bytes, CPU, etc.)
648-
gc.mu.demandRUTotal += getRUValueFromConsumption(delta)
649683
// Record the consumption of the request by store
650684
count := &rmpb.Consumption{}
651685
*count = *delta
@@ -667,6 +701,13 @@ func (gc *groupCostController) onResponseWaitImpl(
667701
for _, calc := range gc.calculators {
668702
calc.AfterKVRequest(delta, req, resp)
669703
}
704+
705+
// Record pre-throttling demand BEFORE acquireTokens so it is captured
706+
// even when the response is rejected by the limiter. Without this hoist
707+
// the demand counter would silently miss exactly the throttled responses
708+
// the metric is supposed to surface.
709+
gc.recordDemand(delta)
710+
670711
var waitDuration time.Duration
671712
if !gc.burstable.Load() {
672713
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
@@ -687,7 +728,6 @@ func (gc *groupCostController) onResponseWaitImpl(
687728
gc.mu.Lock()
688729
// Record the consumption of the request
689730
add(gc.mu.consumption, delta)
690-
gc.mu.demandRUTotal += getRUValueFromConsumption(delta)
691731
// Record the consumption of the request by store
692732
count := &rmpb.Consumption{}
693733
*count = *delta
@@ -703,9 +743,9 @@ func (gc *groupCostController) onResponseWaitImpl(
703743
}
704744

705745
func (gc *groupCostController) addRUConsumption(consumption *rmpb.Consumption) {
746+
gc.recordDemand(consumption)
706747
gc.mu.Lock()
707748
add(gc.mu.consumption, consumption)
708-
gc.mu.demandRUTotal += getRUValueFromConsumption(consumption)
709749
gc.mu.Unlock()
710750
}
711751

client/resource_group/controller/group_controller_test.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func TestDemandRUTracking(t *testing.T) {
345345
gc.mu.Unlock()
346346
re.Positive(demandTotal, "demand should be accumulated after requests")
347347

348-
// Now issue a request that gets throttled (rejected).
348+
// Now issue a request that gets throttled (rejected) on `onRequestWaitImpl`.
349349
bigReq := &TestRequestInfo{
350350
isWrite: true,
351351
writeBytes: 10000000,
@@ -361,18 +361,82 @@ func TestDemandRUTracking(t *testing.T) {
361361
re.Greater(demandAfterThrottle, demandTotal,
362362
"demand should increase even for throttled requests")
363363

364-
// Verify that the demand EMA is computed correctly.
365-
now := time.Now()
366-
gc.run.now = now
367-
gc.updateRunState()
368-
gc.updateAvgRequestResourcePerSec()
364+
// Verify the demand EMA math directly. We deliberately avoid going through
365+
// `updateRunState` here because that method overwrites `gc.run.now` with
366+
// `time.Now()` on every call, which makes any caller-side time control a
367+
// no-op. Instead, snapshot demand into `gc.run` once and drive `calcDemandAvg`
368+
// with hand-set timestamps so the EMA's behavior is observable.
369+
gc.updateRunState() // copy mu.demandRUTotal into gc.run.demandRUTotal once.
369370

370-
// Advance time and update again so the EMA has two data points.
371-
gc.run.now = now.Add(time.Second)
372-
gc.updateRunState()
373-
gc.updateAvgRequestResourcePerSec()
371+
counter := gc.run.requestUnitTokens
372+
// Reset the EMA bookkeeping so we can measure a clean two-tick trajectory.
373+
counter.avgDemandRUPerSec = 0
374+
counter.avgDemandRUPerSecLastRU = 0
375+
base := time.Unix(0, 0)
376+
counter.avgDemandLastTime = base
377+
gc.run.now = base.Add(time.Second)
378+
379+
re.True(gc.calcDemandAvg(counter, gc.run.demandRUTotal))
380+
// First tick: avg = movingAvgFactor*0 + (1-movingAvgFactor) * (demandTotal/1s).
381+
expectedFirst := (1 - movingAvgFactor) * gc.run.demandRUTotal
382+
re.InEpsilon(expectedFirst, counter.avgDemandRUPerSec, 1e-9,
383+
"first EMA tick should equal (1-movingAvgFactor) * demand-rate")
384+
385+
// Second tick: same demand snapshot, one more second elapsed -> rate is 0,
386+
// so the EMA must decay toward zero by movingAvgFactor.
387+
gc.run.now = base.Add(2 * time.Second)
388+
prev := counter.avgDemandRUPerSec
389+
re.True(gc.calcDemandAvg(counter, gc.run.demandRUTotal))
390+
re.InEpsilon(movingAvgFactor*prev, counter.avgDemandRUPerSec, 1e-9,
391+
"with no new demand the EMA should decay by movingAvgFactor")
392+
393+
// Same `gc.run.now` -> calcDemandAvg must report no update and leave state alone.
394+
re.False(gc.calcDemandAvg(counter, gc.run.demandRUTotal))
395+
}
374396

397+
// TestDemandRUCapturedOnResponseWaitThrottle locks in the invariant that
398+
// `demand_ru_per_sec` reflects rejected responses too. Without the
399+
// `recordDemand` hoist in `onResponseWaitImpl`, throttle-rejected responses
400+
// would be silently absent from the demand counter -- defeating the metric.
401+
func TestDemandRUCapturedOnResponseWaitThrottle(t *testing.T) {
402+
re := require.New(t)
403+
gc := createTestGroupCostController(re)
404+
// Short retry budget so the test fails fast.
405+
gc.mainCfg.WaitRetryInterval = 5 * time.Millisecond
406+
gc.mainCfg.WaitRetryTimes = 2
407+
gc.mainCfg.LTBMaxWaitDuration = 10 * time.Millisecond
408+
409+
// Stop the bucket from refilling. The limiter still carries its initial
410+
// tokens (FillRate=1000 -> 1000 RU seeded in initRunState), so the request
411+
// below must demand strictly more than that to provoke a throttle error.
375412
counter := gc.run.requestUnitTokens
376-
re.GreaterOrEqual(counter.avgDemandRUPerSec, 0.0,
377-
"demand EMA should be non-negative")
413+
counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{
414+
newTokens: 0,
415+
newFillRate: 0,
416+
newBurst: 0,
417+
})
418+
// `allowDebt` in `onResponseWaitImpl` is false only when the response is
419+
// "big" (read+write bytes >= bigRequestThreshold) AND the group is already
420+
// throttled. Force both.
421+
gc.isThrottled.Store(true)
422+
423+
gc.mu.Lock()
424+
demandBefore := gc.mu.demandRUTotal
425+
gc.mu.Unlock()
426+
427+
const readBytes = 128 * 1024 * 1024 // 128 MiB -> 2048 RRU at default 1/64KiB cost
428+
req := &TestRequestInfo{isWrite: false}
429+
resp := &TestResponseInfo{
430+
readBytes: readBytes,
431+
succeed: true,
432+
}
433+
_, _, err := gc.onResponseWaitImpl(context.TODO(), req, resp)
434+
re.Error(err)
435+
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
436+
437+
gc.mu.Lock()
438+
demandAfter := gc.mu.demandRUTotal
439+
gc.mu.Unlock()
440+
re.Greater(demandAfter, demandBefore,
441+
"demand should be recorded for responses rejected by the limiter")
378442
}

0 commit comments

Comments
 (0)