Skip to content

Commit f01d119

Browse files
committed
resource_control: add PredictedReadBytes hint for RC paging pre-charge
Introduce an optional predictedReadBytesProvider interface on RequestInfo. When a caller (e.g. TiDB maintaining a per-logical-scan EMA across paging RPCs) supplies a non-zero PredictedReadBytes, BeforeKVRequest/AfterKVRequest use that value as the byte basis for the paging pre-charge instead of PagingSizeBytes. PagingSizeBytes remains the fallback and worst-case cap. This lets TiDB replace the current fixed 4 MiB pre-charge (which matches the paging byte budget but typically overshoots actual scanned bytes and stalls concurrent workers at Phase 1) with a learned estimate, without changing kvproto or TiKV behavior. The hint is added as an optional interface (not a method on RequestInfo) so existing RequestInfo implementations compile unchanged; they continue to fall back to PagingSizeBytes. Ref: per-logical-scan EMA pre-deduction design Signed-off-by: Yuhao Zhang <yhzhang00@outlook.com>
1 parent fda2146 commit f01d119

File tree

3 files changed

+112
-12
lines changed

3 files changed

+112
-12
lines changed

client/resource_group/controller/group_controller_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,70 @@ func TestPagingSizeBytesPreCharge(t *testing.T) {
257257
"Without paging, Phase 1 should only charge baseCost")
258258
}
259259

260+
func TestPredictedReadBytesOverridesPagingSizeBytes(t *testing.T) {
261+
re := require.New(t)
262+
cfg := DefaultRUConfig()
263+
kvCalc := newKVCalculator(cfg)
264+
265+
// When PredictedReadBytes (the EMA hint) is > 0, it should override
266+
// PagingSizeBytes as the pre-charge basis. PagingSizeBytes is kept as a
267+
// safety cap and worst-case fallback; the hint gives a tighter estimate.
268+
pagingSizeBytes := uint64(4 * 1024 * 1024) // 4 MB worst-case cap
269+
predictedReadBytes := uint64(256 * 1024) // 256 KB learned estimate
270+
req := &TestRequestInfo{
271+
isWrite: false,
272+
pagingSizeBytes: pagingSizeBytes,
273+
predictedReadBytes: predictedReadBytes,
274+
}
275+
276+
phase1 := &rmpb.Consumption{}
277+
kvCalc.BeforeKVRequest(phase1, req)
278+
279+
baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion
280+
hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes)
281+
re.InDelta(baseCost+hintCost, phase1.RRU, 1e-6,
282+
"Phase 1 should pre-charge based on PredictedReadBytes, not PagingSizeBytes")
283+
284+
// Phase 2 should subtract the same hint-based basis, preserving the
285+
// invariant that Phase 1 + Phase 2 == baseCost + actualCost.
286+
actualReadBytes := uint64(300 * 1024) // close to the prediction
287+
resp := &TestResponseInfo{
288+
readBytes: actualReadBytes,
289+
kvCPU: 0,
290+
succeed: true,
291+
}
292+
phase2 := &rmpb.Consumption{}
293+
kvCalc.AfterKVRequest(phase2, req, resp)
294+
295+
actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes)
296+
expectedPhase2RRU := actualReadCost - hintCost
297+
re.InDelta(expectedPhase2RRU, phase2.RRU, 1e-6,
298+
"Phase 2 should settle using the same hint basis as Phase 1")
299+
300+
totalRRU := phase1.RRU + phase2.RRU
301+
re.InDelta(baseCost+actualReadCost, totalRRU, 1e-6,
302+
"Total RRU across Phase 1+2 should still equal baseCost + actualCost")
303+
304+
// When the hint is zero, fall back to PagingSizeBytes (old behavior).
305+
reqFallback := &TestRequestInfo{
306+
isWrite: false,
307+
pagingSizeBytes: pagingSizeBytes,
308+
// predictedReadBytes left at 0
309+
}
310+
phase1Fallback := &rmpb.Consumption{}
311+
kvCalc.BeforeKVRequest(phase1Fallback, reqFallback)
312+
fallbackBytesCost := float64(cfg.ReadBytesCost) * float64(pagingSizeBytes)
313+
re.InDelta(baseCost+fallbackBytesCost, phase1Fallback.RRU, 1e-6,
314+
"With no hint, pre-charge should fall back to PagingSizeBytes")
315+
316+
// When both are zero, no byte-based pre-charge is applied.
317+
reqNone := &TestRequestInfo{isWrite: false}
318+
phase1None := &rmpb.Consumption{}
319+
kvCalc.BeforeKVRequest(phase1None, reqNone)
320+
re.InDelta(baseCost, phase1None.RRU, 1e-6,
321+
"Without hint or paging, Phase 1 should only charge baseCost")
322+
}
323+
260324
func TestPagingPreChargeTokenRefund(t *testing.T) {
261325
re := require.New(t)
262326
gc := createTestGroupCostController(re)

client/resource_group/controller/model.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,32 @@ type RequestInfo interface {
5757
PagingSizeBytes() uint64
5858
}
5959

60+
// predictedReadBytesProvider is an optional interface that a RequestInfo
61+
// implementation may satisfy to supply a learned estimate (e.g., from a
62+
// per-logical-scan EMA maintained in TiDB) of how many bytes the request
63+
// will read. When present and > 0, it overrides PagingSizeBytes as the
64+
// byte basis for RC paging pre-charge.
65+
//
66+
// Defined as an optional interface (not a method on RequestInfo) so older
67+
// RequestInfo implementations that have not been updated still compile and
68+
// behave as before (falling back to PagingSizeBytes).
69+
type predictedReadBytesProvider interface {
70+
PredictedReadBytes() uint64
71+
}
72+
73+
// estimatedReadBytes returns the byte basis used for RC paging pre-charge.
74+
// It prefers a learned PredictedReadBytes hint when the RequestInfo
75+
// implements the optional provider and returns a non-zero value; otherwise
76+
// it falls back to PagingSizeBytes (the paging byte budget / worst-case cap).
77+
func estimatedReadBytes(req RequestInfo) uint64 {
78+
if p, ok := req.(predictedReadBytesProvider); ok {
79+
if hint := p.PredictedReadBytes(); hint > 0 {
80+
return hint
81+
}
82+
}
83+
return req.PagingSizeBytes()
84+
}
85+
6086
// ResponseInfo is the interface of the response information provider. A response should be
6187
// able to tell how many bytes it read and KV CPU cost in milliseconds.
6288
type ResponseInfo interface {
@@ -107,9 +133,11 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque
107133
consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion
108134
// RC Paging pre-charge: if the request has a byte budget, pre-charge
109135
// the estimated read bytes RU so that concurrent workers are throttled
110-
// at Phase 1 instead of all hitting Phase 2 at once.
111-
if pagingSizeBytes := req.PagingSizeBytes(); pagingSizeBytes > 0 {
112-
consumption.RRU += float64(kc.ReadBytesCost) * float64(pagingSizeBytes)
136+
// at Phase 1 instead of all hitting Phase 2 at once. Prefer a learned
137+
// PredictedReadBytes hint when the caller supplies one; otherwise fall
138+
// back to PagingSizeBytes (the paging byte budget / worst-case cap).
139+
if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {
140+
consumption.RRU += float64(kc.ReadBytesCost) * float64(bytesForEst)
113141
}
114142
}
115143
if req.AccessLocationType() == AccessCrossZone {
@@ -147,9 +175,10 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques
147175
kc.calculateCPUCost(consumption, res)
148176
// RC Paging settlement: subtract the pre-charged bytes RU added in
149177
// BeforeKVRequest so the net total (Phase 1 + Phase 2) equals
150-
// baseCost + actualCost.
151-
if pagingSizeBytes := req.PagingSizeBytes(); pagingSizeBytes > 0 {
152-
consumption.RRU -= float64(kc.ReadBytesCost) * float64(pagingSizeBytes)
178+
// baseCost + actualCost. Use the same basis (hint or PagingSizeBytes)
179+
// that BeforeKVRequest used.
180+
if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {
181+
consumption.RRU -= float64(kc.ReadBytesCost) * float64(bytesForEst)
153182
}
154183
} else if !res.Succeed() {
155184
// If the write request is not successfully returned, we need to pay back the WRU cost.

client/resource_group/controller/testutil.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ import "time"
2222

2323
// TestRequestInfo is used to test the request info interface.
2424
type TestRequestInfo struct {
25-
isWrite bool
26-
writeBytes uint64
27-
numReplicas int64
28-
storeID uint64
29-
accessType AccessLocationType
30-
pagingSizeBytes uint64
25+
isWrite bool
26+
writeBytes uint64
27+
numReplicas int64
28+
storeID uint64
29+
accessType AccessLocationType
30+
pagingSizeBytes uint64
31+
predictedReadBytes uint64
3132
}
3233

3334
// NewTestRequestInfo creates a new TestRequestInfo.
@@ -76,6 +77,12 @@ func (tri *TestRequestInfo) PagingSizeBytes() uint64 {
7677
return tri.pagingSizeBytes
7778
}
7879

80+
// PredictedReadBytes implements the optional predictedReadBytesProvider
81+
// interface so tests can exercise the EMA-based hint path.
82+
func (tri *TestRequestInfo) PredictedReadBytes() uint64 {
83+
return tri.predictedReadBytes
84+
}
85+
7986
// TestResponseInfo is used to test the response info interface.
8087
type TestResponseInfo struct {
8188
readBytes uint64

0 commit comments

Comments
 (0)