Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 84 additions & 11 deletions client/resource_group/controller/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ type groupMetricsCollection struct {
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer

// Paging pre-charge observability: cached per-(RG, source) so the hot path
// avoids WithLabelValues on every KV request.
prechargeSourcePredicted prometheus.Counter
prechargeSourceFallback prometheus.Counter
prechargeBytesPredicted prometheus.Counter
prechargeBytesFallback prometheus.Counter
actualBytesPredicted prometheus.Counter
actualBytesFallback prometheus.Counter
predictionResidualBytes prometheus.Observer
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Expand All @@ -122,6 +132,53 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),

prechargeSourcePredicted: metrics.PagingPrechargeSourceCounter.WithLabelValues(name, metrics.SourcePredicted),
prechargeSourceFallback: metrics.PagingPrechargeSourceCounter.WithLabelValues(name, metrics.SourceFallback),
prechargeBytesPredicted: metrics.PagingPrechargeBytesCounter.WithLabelValues(name, metrics.SourcePredicted),
prechargeBytesFallback: metrics.PagingPrechargeBytesCounter.WithLabelValues(name, metrics.SourceFallback),
actualBytesPredicted: metrics.PagingActualBytesCounter.WithLabelValues(name, metrics.SourcePredicted),
actualBytesFallback: metrics.PagingActualBytesCounter.WithLabelValues(name, metrics.SourceFallback),
predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name),
}
}

// estimatePrechargeSource reports which source RC paging pre-charge will use
// for req and the byte basis it would charge. Mirrors estimatedReadBytes in
// model.go but also returns the source label so we can instrument per path.
// Returns ("", 0) when there is no paging pre-charge to observe.
func estimatePrechargeSource(req RequestInfo) (source string, bytesForEst uint64) {
if p, ok := req.(predictedReadBytesProvider); ok {
if hint := p.PredictedReadBytes(); hint > 0 {
return metrics.SourcePredicted, hint
}
}
if b := req.PagingSizeBytes(); b > 0 {
return metrics.SourceFallback, b
}
return "", 0
}

func (gmc *groupMetricsCollection) observePagingPrecharge(source string, bytesForEst uint64) {
switch source {
case metrics.SourcePredicted:
gmc.prechargeSourcePredicted.Inc()
gmc.prechargeBytesPredicted.Add(float64(bytesForEst))
case metrics.SourceFallback:
gmc.prechargeSourceFallback.Inc()
gmc.prechargeBytesFallback.Add(float64(bytesForEst))
}
}

func (gmc *groupMetricsCollection) observePagingActual(source string, predicted, actual uint64) {
switch source {
case metrics.SourcePredicted:
gmc.actualBytesPredicted.Add(float64(actual))
// Residual is only meaningful when the pre-charge used a learned hint;
// for fallback the "prediction" is just the paging budget.
gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted))
case metrics.SourceFallback:
gmc.actualBytesFallback.Add(float64(actual))
}
}

Expand Down Expand Up @@ -551,6 +608,9 @@ func (gc *groupCostController) onRequestWaitImpl(
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
}
if source, bytesForEst := estimatePrechargeSource(info); bytesForEst > 0 {
gc.metrics.observePagingPrecharge(source, bytesForEst)
}

gc.mu.Lock()
add(gc.mu.consumption, delta)
Expand Down Expand Up @@ -601,10 +661,15 @@ func (gc *groupCostController) onResponseImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
if source, bytesForEst := estimatePrechargeSource(req); bytesForEst > 0 {
gc.metrics.observePagingActual(source, bytesForEst, resp.ReadBytes())
}
if !gc.burstable.Load() {
counter := gc.run.requestUnitTokens
if v := getRUValueFromConsumption(delta); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
} else if v < 0 {
counter.limiter.RefundTokens(time.Now(), -v)
}
}

Expand Down Expand Up @@ -632,21 +697,29 @@ func (gc *groupCostController) onResponseWaitImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
if source, bytesForEst := estimatePrechargeSource(req); bytesForEst > 0 {
gc.metrics.observePagingActual(source, bytesForEst, resp.ReadBytes())
}
var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
v := getRUValueFromConsumption(delta)
if v > 0 {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
return nil, waitDuration, err
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
} else if v < 0 {
gc.run.requestUnitTokens.limiter.RefundTokens(time.Now(), -v)
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
Expand Down
Loading
Loading