Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions client/resource_group/controller/global_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,12 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
metrics.DemandRUPerSecGauge.DeleteLabelValues(resourceGroupName)
// TODO: clean up the remaining per-group metrics (e.g. TokenConsumedHistogram,
// GroupRunningKVRequestCounter, SuccessfulRequestDuration, FailedRequestCounter,
// ResourceGroupTokenRequestCounter, RequestRetryCounter, FailedLimitReserveDuration,
// LowTokenRequestNotifyCounter) which currently leak label series on resource
// group deletion.
return true
}
gc.inactive = true
Expand Down
87 changes: 82 additions & 5 deletions client/resource_group/controller/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type groupCostController struct {
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
globalCounter *rmpb.Consumption
// demandRUTotal accumulates total demanded RU (pre-throttling).
// Unlike consumption, this is never subtracted on throttle failure.
demandRUTotal float64
}

// fast path to make once token limit with un-limit burst.
Expand All @@ -75,6 +78,9 @@ type groupCostController struct {
// last update.
targetPeriod time.Duration

// demandRUTotal is a snapshot of mu.demandRUTotal, copied in updateRunState.
demandRUTotal float64

// consumptions stores the last value of mu.consumption.
// requestUnitConsumptions []*rmpb.RequestUnitItem
// resourceConsumptions []*rmpb.ResourceItem
Expand Down Expand Up @@ -106,6 +112,7 @@ type groupMetricsCollection struct {
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer
demandRUPerSecGauge prometheus.Gauge
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Expand All @@ -122,6 +129,7 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),
demandRUPerSecGauge: metrics.DemandRUPerSecGauge.WithLabelValues(name),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these metrics being cleaned?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. On the current master only ResourceGroupStatusGauge is cleaned in cleanUpResourceGroup, so this new gauge would leak label series the same way most other per-group metrics already do. The leak impact is small in practice (resource groups are typically long-lived, client-side only, bounded per process), but it's still a real leak and not a design decision. I'll add DeleteLabelValues for DemandRUPerSecGauge in the cleanup path to keep this PR self-consistent. Cleaning up the other pre-existing per-group metrics is out of scope here — I'm tracking that in a separate branch.

}
}

Expand All @@ -136,6 +144,12 @@ type tokenCounter struct {
avgRUPerSecLastRU float64
avgLastTime time.Time

// avgDemandRUPerSec is an EMA of the demanded RU/s before throttling,
// reflecting the true workload demand regardless of token bucket limits.
avgDemandRUPerSec float64
avgDemandRUPerSecLastRU float64
avgDemandLastTime time.Time

notify struct {
mu sync.Mutex
setupNotificationCh <-chan time.Time
Expand Down Expand Up @@ -220,10 +234,11 @@ func (gc *groupCostController) initRunState() {
defer gc.metaLock.RUnlock()
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(gc.meta.RUSettings.RU), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
fillRate: gc.meta.RUSettings.RU.Settings.FillRate,
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
avgDemandLastTime: now,
fillRate: gc.meta.RUSettings.RU.Settings.FillRate,
}
gc.run.requestUnitTokens = counter
gc.burstable.Store(isBurstable)
Expand Down Expand Up @@ -257,11 +272,29 @@ func (gc *groupCostController) updateRunState() {
calc.Trickle(gc.mu.consumption)
}
*gc.run.consumption = *gc.mu.consumption
gc.run.demandRUTotal = gc.mu.demandRUTotal
gc.mu.Unlock()
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption), zap.Bool("is-throttled", gc.isThrottled.Load()))
gc.run.now = newTime
}

// recordDemand accumulates a delta into the pre-throttling demand counter.
//
// Call sites MUST invoke this before any token-bucket wait/acquire so that
// demand is captured even when the request is ultimately rejected by the
// limiter; that is the entire reason `demandRUTotal` is tracked separately
// from `consumption`. Demand is monotonically increasing and is never rolled
// back on throttle failure.
func (gc *groupCostController) recordDemand(delta *rmpb.Consumption) {
v := getRUValueFromConsumption(delta)
if v == 0 {
return
}
gc.mu.Lock()
gc.mu.demandRUTotal += v
gc.mu.Unlock()
}

func (gc *groupCostController) updateAvgRequestResourcePerSec() {
isBurstable := true
counter := gc.run.requestUnitTokens
Expand All @@ -271,7 +304,10 @@ func (gc *groupCostController) updateAvgRequestResourcePerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption)) {
return
}
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
if gc.calcDemandAvg(counter, gc.run.demandRUTotal) {
gc.metrics.demandRUPerSecGauge.Set(counter.avgDemandRUPerSec)
}
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Float64("avg-demand-ru-per-sec", counter.avgDemandRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
gc.burstable.Store(isBurstable)
}

Expand Down Expand Up @@ -319,6 +355,28 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool
return true
}

// calcDemandAvg recomputes the EMA of pre-throttling demanded RU/s.
//
// Returns false (and leaves state untouched) when no time has elapsed since
// the last update so the gauge is not re-Set with a stale reading. Unlike
// `calcAvg`, no negative-clamp is needed because `demandRUTotal` is
// monotonically increasing; the EMA of a non-negative-delta sequence cannot
// itself become negative.
func (gc *groupCostController) calcDemandAvg(counter *tokenCounter, new float64) bool {
deltaDuration := gc.run.now.Sub(counter.avgDemandLastTime)
failpoint.Inject("acceleratedReportingPeriod", func() {
deltaDuration = 100 * time.Millisecond
})
if deltaDuration <= 0 {
return false
}
delta := (new - counter.avgDemandRUPerSecLastRU) / deltaDuration.Seconds()
counter.avgDemandRUPerSec = movingAvgFactor*counter.avgDemandRUPerSec + (1-movingAvgFactor)*delta
counter.avgDemandLastTime = gc.run.now
counter.avgDemandRUPerSecLastRU = new
return true
}

func (gc *groupCostController) shouldReportConsumption() bool {
if !gc.run.initialRequestCompleted {
return true
Expand Down Expand Up @@ -552,6 +610,10 @@ func (gc *groupCostController) onRequestWaitImpl(
calc.BeforeKVRequest(delta, info)
}

// Record pre-throttling demand before any limiter interaction so a
// subsequent rollback only unwinds consumption, not demand.
gc.recordDemand(delta)

gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
Expand Down Expand Up @@ -601,6 +663,13 @@ func (gc *groupCostController) onResponseImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}

// Record pre-throttling demand. `onResponseImpl` does not block on token
// acquisition, so this could equivalently sit inside the lock block below;
// keeping it here makes the demand-before-limiter invariant uniform across
// all entry points.
gc.recordDemand(delta)

if !gc.burstable.Load() {
counter := gc.run.requestUnitTokens
if v := getRUValueFromConsumption(delta); v > 0 {
Expand Down Expand Up @@ -632,6 +701,13 @@ func (gc *groupCostController) onResponseWaitImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}

// Record pre-throttling demand BEFORE acquireTokens so it is captured
// even when the response is rejected by the limiter. Without this hoist
// the demand counter would silently miss exactly the throttled responses
// the metric is supposed to surface.
gc.recordDemand(delta)

var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
Expand Down Expand Up @@ -667,6 +743,7 @@ func (gc *groupCostController) onResponseWaitImpl(
}

func (gc *groupCostController) addRUConsumption(consumption *rmpb.Consumption) {
gc.recordDemand(consumption)
gc.mu.Lock()
add(gc.mu.consumption, consumption)
gc.mu.Unlock()
Expand Down
125 changes: 125 additions & 0 deletions client/resource_group/controller/group_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,128 @@ func TestAcquireTokensFallbackToTimer(t *testing.T) {
// waitDuration should be roughly retryTimes * retryInterval.
re.GreaterOrEqual(waitDuration, gc.mainCfg.WaitRetryInterval*time.Duration(gc.mainCfg.WaitRetryTimes))
}

func TestDemandRUTracking(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)

// Simulate requests arriving: demand should accumulate regardless of throttling.
req := &TestRequestInfo{
isWrite: true,
writeBytes: 100,
}
resp := &TestResponseInfo{
readBytes: 100,
succeed: true,
}

// Issue several successful requests.
for range 5 {
consumption, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req)
re.NoError(err)
re.NotNil(consumption)
_, err = gc.onResponseImpl(req, resp)
re.NoError(err)
}

// demandRUTotal should have accumulated all pre-request and post-response RU.
gc.mu.Lock()
demandTotal := gc.mu.demandRUTotal
gc.mu.Unlock()
re.Positive(demandTotal, "demand should be accumulated after requests")

// Now issue a request that gets throttled (rejected) on `onRequestWaitImpl`.
bigReq := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
}
_, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), bigReq)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))

// demandRUTotal should still include the throttled request's RU.
gc.mu.Lock()
demandAfterThrottle := gc.mu.demandRUTotal
gc.mu.Unlock()
re.Greater(demandAfterThrottle, demandTotal,
"demand should increase even for throttled requests")

// Verify the demand EMA math directly. We deliberately avoid going through
// `updateRunState` here because that method overwrites `gc.run.now` with
// `time.Now()` on every call, which makes any caller-side time control a
// no-op. Instead, snapshot demand into `gc.run` once and drive `calcDemandAvg`
// with hand-set timestamps so the EMA's behavior is observable.
gc.updateRunState() // copy mu.demandRUTotal into gc.run.demandRUTotal once.

counter := gc.run.requestUnitTokens
// Reset the EMA bookkeeping so we can measure a clean two-tick trajectory.
counter.avgDemandRUPerSec = 0
counter.avgDemandRUPerSecLastRU = 0
base := time.Unix(0, 0)
counter.avgDemandLastTime = base
gc.run.now = base.Add(time.Second)

re.True(gc.calcDemandAvg(counter, gc.run.demandRUTotal))
// First tick: avg = movingAvgFactor*0 + (1-movingAvgFactor) * (demandTotal/1s).
expectedFirst := (1 - movingAvgFactor) * gc.run.demandRUTotal
re.InEpsilon(expectedFirst, counter.avgDemandRUPerSec, 1e-9,
"first EMA tick should equal (1-movingAvgFactor) * demand-rate")

// Second tick: same demand snapshot, one more second elapsed -> rate is 0,
// so the EMA must decay toward zero by movingAvgFactor.
gc.run.now = base.Add(2 * time.Second)
prev := counter.avgDemandRUPerSec
re.True(gc.calcDemandAvg(counter, gc.run.demandRUTotal))
re.InEpsilon(movingAvgFactor*prev, counter.avgDemandRUPerSec, 1e-9,
"with no new demand the EMA should decay by movingAvgFactor")

// Same `gc.run.now` -> calcDemandAvg must report no update and leave state alone.
re.False(gc.calcDemandAvg(counter, gc.run.demandRUTotal))
}

// TestDemandRUCapturedOnResponseWaitThrottle locks in the invariant that
// `demand_ru_per_sec` reflects rejected responses too. Without the
// `recordDemand` hoist in `onResponseWaitImpl`, throttle-rejected responses
// would be silently absent from the demand counter -- defeating the metric.
func TestDemandRUCapturedOnResponseWaitThrottle(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
// Short retry budget so the test fails fast.
gc.mainCfg.WaitRetryInterval = 5 * time.Millisecond
gc.mainCfg.WaitRetryTimes = 2
gc.mainCfg.LTBMaxWaitDuration = 10 * time.Millisecond

// Stop the bucket from refilling. The limiter still carries its initial
// tokens (FillRate=1000 -> 1000 RU seeded in initRunState), so the request
// below must demand strictly more than that to provoke a throttle error.
counter := gc.run.requestUnitTokens
counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{
newTokens: 0,
newFillRate: 0,
newBurst: 0,
})
// `allowDebt` in `onResponseWaitImpl` is false only when the response is
// "big" (read+write bytes >= bigRequestThreshold) AND the group is already
// throttled. Force both.
gc.isThrottled.Store(true)

gc.mu.Lock()
demandBefore := gc.mu.demandRUTotal
gc.mu.Unlock()

const readBytes = 128 * 1024 * 1024 // 128 MiB -> 2048 RRU at default 1/64KiB cost
req := &TestRequestInfo{isWrite: false}
resp := &TestResponseInfo{
readBytes: readBytes,
succeed: true,
}
_, _, err := gc.onResponseWaitImpl(context.TODO(), req, resp)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))

gc.mu.Lock()
demandAfter := gc.mu.demandRUTotal
gc.mu.Unlock()
re.Greater(demandAfter, demandBefore,
"demand should be recorded for responses rejected by the limiter")
}
13 changes: 13 additions & 0 deletions client/resource_group/controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
var (
// ResourceGroupStatusGauge comments placeholder
ResourceGroupStatusGauge *prometheus.GaugeVec
// DemandRUPerSecGauge is the EMA of demanded RU/s per resource group, including
// requests rejected by the token bucket (pre-throttling demand).
DemandRUPerSecGauge *prometheus.GaugeVec
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is throttled, will the demand be recorded?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — that's exactly the design intent of this metric. Demand is accumulated at request entry (onRequestWaitImpl etc.) before acquireTokens is called, and on throttle error only consumption is rolled back via sub(gc.mu.consumption, delta); demandRUTotal is never subtracted. So throttled requests still count toward demand, which is the whole point of exposing this separately from the existing consumption-based avgRUPerSec. I'll also tighten the Help text / doc comment to make this invariant explicit ("including requests rejected by the token bucket").

// SuccessfulRequestDuration comments placeholder
SuccessfulRequestDuration *prometheus.HistogramVec
// FailedLimitReserveDuration comments placeholder
Expand Down Expand Up @@ -69,6 +72,15 @@ func initMetrics(constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

DemandRUPerSecGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "resource_group",
Name: "demand_ru_per_sec",
Help: "EMA of demanded RU/s per resource group, including requests rejected by the token bucket (pre-throttling demand).",
ConstLabels: constLabels,
}, []string{newResourceGroupNameLabel})

SuccessfulRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Expand Down Expand Up @@ -162,6 +174,7 @@ func initMetrics(constLabels prometheus.Labels) {
func InitAndRegisterMetrics(constLabels prometheus.Labels) {
initMetrics(constLabels)
prometheus.MustRegister(ResourceGroupStatusGauge)
prometheus.MustRegister(DemandRUPerSecGauge)
prometheus.MustRegister(SuccessfulRequestDuration)
prometheus.MustRegister(FailedRequestCounter)
prometheus.MustRegister(FailedLimitReserveDuration)
Expand Down
Loading