Skip to content

Commit 467deb4

Browse files
authored
resource group: only persist states of active resource groups (tikv#407)
Signed-off-by: disksing <i@disksing.com>
1 parent 71e8185 commit 467deb4

File tree

1 file changed

+75
-54
lines changed

1 file changed

+75
-54
lines changed

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 75 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ type Manager struct {
7171
isTiFlash bool
7272
}
7373
// record update time of each resource group
74-
consumptionRecord map[consumptionRecordKey]time.Time
74+
consumptionRecordMu syncutil.RWMutex
75+
consumptionRecord map[consumptionRecordKey]time.Time
7576
}
7677

7778
type consumptionRecordKey struct {
@@ -343,22 +344,55 @@ func (m *Manager) persistLoop(ctx context.Context) {
343344
}
344345

345346
func (m *Manager) persistResourceGroupRunningState() {
347+
for _, group := range m.getActiveResourceGroups() {
348+
if err := group.persistStates(m.storage); err != nil {
349+
log.Error("persist resource group state failed", zap.Error(err))
350+
}
351+
}
352+
}
353+
354+
func (m *Manager) updateConsumptionRecord(name string, ruType string) {
355+
m.consumptionRecordMu.Lock()
356+
defer m.consumptionRecordMu.Unlock()
357+
m.consumptionRecord[consumptionRecordKey{name: name, ruType: ruType}] = time.Now()
358+
}
359+
360+
func (m *Manager) getActiveResourceGroups() []*ResourceGroup {
361+
m.consumptionRecordMu.RLock()
362+
groupNames := make([]string, 0, len(m.consumptionRecord))
363+
for k := range m.consumptionRecord {
364+
if k.name == reservedDefaultGroupName {
365+
continue
366+
}
367+
groupNames = append(groupNames, k.name)
368+
}
369+
m.consumptionRecordMu.RUnlock()
370+
346371
m.RLock()
347-
keys := make([]string, 0, len(m.groups))
348-
for k := range m.groups {
349-
keys = append(keys, k)
372+
defer m.RUnlock()
373+
groups := make([]*ResourceGroup, 0, len(groupNames))
374+
for _, name := range groupNames {
375+
if group, ok := m.groups[name]; ok {
376+
groups = append(groups, group)
377+
}
350378
}
351-
m.RUnlock()
352-
for idx := range keys {
353-
m.RLock()
354-
group, ok := m.groups[keys[idx]]
355-
if ok {
356-
if err := group.persistStates(m.storage); err != nil {
357-
log.Error("persist resource group state failed", zap.Error(err))
358-
}
379+
return groups
380+
}
381+
382+
func (m *Manager) getInactiveResourceGroups() []consumptionRecordKey {
383+
m.consumptionRecordMu.Lock()
384+
defer m.consumptionRecordMu.Unlock()
385+
var keys []consumptionRecordKey
386+
for k, lastTime := range m.consumptionRecord {
387+
if k.name == reservedDefaultGroupName {
388+
continue
389+
}
390+
if time.Since(lastTime) > metricsCleanupTimeout {
391+
delete(m.consumptionRecord, k)
392+
keys = append(keys, k)
359393
}
360-
m.RUnlock()
361394
}
395+
return keys
362396
}
363397

364398
// Receive the consumption and flush it to the metrics.
@@ -386,10 +420,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context, pushMetricsAddr st
386420
case <-ctx.Done():
387421
return
388422
case consumptionInfo := <-m.consumptionDispatcher:
389-
consumption := consumptionInfo.Consumption
390-
if consumption == nil {
391-
continue
392-
}
423+
name := consumptionInfo.resourceGroupName
393424
ruLabelType := defaultTypeLabel
394425
if consumptionInfo.isBackground {
395426
ruLabelType = backgroundTypeLabel
@@ -398,8 +429,14 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context, pushMetricsAddr st
398429
ruLabelType = tiflashTypeLabel
399430
}
400431

432+
m.updateConsumptionRecord(name, ruLabelType)
433+
434+
consumption := consumptionInfo.Consumption
435+
if consumption == nil {
436+
continue
437+
}
438+
401439
var (
402-
name = consumptionInfo.resourceGroupName
403440
rruMetrics = readRequestUnitCost.WithLabelValues(name, name, ruLabelType)
404441
wruMetrics = writeRequestUnitCost.WithLabelValues(name, name, ruLabelType)
405442
sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name, name)
@@ -454,50 +491,34 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context, pushMetricsAddr st
454491
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
455492
}
456493

457-
m.consumptionRecord[consumptionRecordKey{name: name, ruType: ruLabelType}] = time.Now()
458-
459494
// TODO: maybe we need to distinguish background ru.
460495
if rg := m.GetMutableResourceGroup(name); rg != nil {
461496
rg.UpdateRUConsumption(consumptionInfo.Consumption)
462497
}
463498
case <-cleanUpTicker.C:
464499
// Clean up the metrics that have not been updated for a long time.
465-
for r, lastTime := range m.consumptionRecord {
466-
if time.Since(lastTime) > metricsCleanupTimeout {
467-
readRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
468-
writeRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
469-
sqlLayerRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
470-
readByteCost.DeleteLabelValues(r.name, r.name, r.ruType)
471-
writeByteCost.DeleteLabelValues(r.name, r.name, r.ruType)
472-
kvCPUCost.DeleteLabelValues(r.name, r.name, r.ruType)
473-
sqlCPUCost.DeleteLabelValues(r.name, r.name, r.ruType)
474-
requestCount.DeleteLabelValues(r.name, r.name, readTypeLabel)
475-
requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel)
476-
availableRUCounter.DeleteLabelValues(r.name, r.name)
477-
delete(m.consumptionRecord, r)
478-
delete(maxPerSecTrackers, r.name)
479-
delete(rcuTrackers, r.name)
480-
readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
481-
writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
482-
resourceGroupConfigGauge.DeletePartialMatch(prometheus.Labels{newResourceGroupNameLabel: r.name})
483-
requestUnitSumPerSec.DeleteLabelValues(r.name)
484-
requestUnitConsumeRate.DeleteLabelValues(r.name)
485-
}
500+
for _, r := range m.getInactiveResourceGroups() {
501+
readRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
502+
writeRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
503+
sqlLayerRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
504+
readByteCost.DeleteLabelValues(r.name, r.name, r.ruType)
505+
writeByteCost.DeleteLabelValues(r.name, r.name, r.ruType)
506+
kvCPUCost.DeleteLabelValues(r.name, r.name, r.ruType)
507+
sqlCPUCost.DeleteLabelValues(r.name, r.name, r.ruType)
508+
requestCount.DeleteLabelValues(r.name, r.name, readTypeLabel)
509+
requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel)
510+
availableRUCounter.DeleteLabelValues(r.name, r.name)
511+
delete(maxPerSecTrackers, r.name)
512+
delete(rcuTrackers, r.name)
513+
readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
514+
writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
515+
resourceGroupConfigGauge.DeletePartialMatch(prometheus.Labels{newResourceGroupNameLabel: r.name})
516+
requestUnitSumPerSec.DeleteLabelValues(r.name)
517+
requestUnitConsumeRate.DeleteLabelValues(r.name)
486518
}
519+
487520
case <-availableRUTicker.C:
488-
m.RLock()
489-
groups := make([]*ResourceGroup, 0, len(m.consumptionRecord))
490-
for r := range m.consumptionRecord {
491-
if r.name == reservedDefaultGroupName {
492-
continue
493-
}
494-
group, ok := m.groups[r.name]
495-
if ok {
496-
groups = append(groups, group)
497-
}
498-
}
499-
m.RUnlock()
500-
// prevent many groups and hold the lock long time.
521+
groups := m.getActiveResourceGroups()
501522
for _, group := range groups {
502523
ru := group.getRUToken()
503524
if ru < 0 {

0 commit comments

Comments
 (0)