Skip to content

Commit f8ab6e5

Browse files
authored
feat(metering): add DFS stats collector for write requests and bytes (#9783)
ref #9707 Add `dfs-stats` collector to track DFS write requests and written bytes per keyspace/component. Signed-off-by: JmPotato <github@ipotato.me>
1 parent e86b044 commit f8ab6e5

File tree

6 files changed

+344
-4
lines changed

6 files changed

+344
-4
lines changed

pkg/core/store_stats.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,26 @@ import (
2222
"github.com/tikv/pd/pkg/utils/typeutil"
2323
)
2424

25+
// DFSStats is used to calculate the DFS stats info for each scope.
26+
type DFSStats struct {
27+
WrittenBytes uint64
28+
WriteRequests uint64
29+
}
30+
2531
type storeStats struct {
2632
mu syncutil.RWMutex
2733
rawStats *pdpb.StoreStats
2834

2935
// avgAvailable is used to make available smooth, aka no sudden changes.
30-
avgAvailable *movingaverage.HMA
36+
avgAvailable *movingaverage.HMA
37+
scopedDFSStats map[pdpb.DfsStatScope]*DFSStats
3138
}
3239

3340
func newStoreStats() *storeStats {
3441
return &storeStats{
35-
rawStats: &pdpb.StoreStats{},
36-
avgAvailable: movingaverage.NewHMA(60), // take 10 minutes sample under 10s heartbeat rate
42+
rawStats: &pdpb.StoreStats{},
43+
avgAvailable: movingaverage.NewHMA(60), // take 10 minutes sample under 10s heartbeat rate
44+
scopedDFSStats: make(map[pdpb.DfsStatScope]*DFSStats),
3745
}
3846
}
3947

@@ -46,6 +54,38 @@ func (ss *storeStats) updateRawStats(rawStats *pdpb.StoreStats) {
4654
return
4755
}
4856
ss.avgAvailable.Add(float64(rawStats.GetAvailable()))
57+
58+
dfsStatItems := rawStats.GetDfs()
59+
if len(dfsStatItems) == 0 {
60+
return
61+
}
62+
for _, dfsStat := range dfsStatItems {
63+
scope := dfsStat.GetScope()
64+
if scope == nil {
65+
continue
66+
}
67+
writtenBytes := dfsStat.GetWrittenBytes()
68+
writeRequests := dfsStat.GetWriteRequests()
69+
stat, ok := ss.scopedDFSStats[*scope]
70+
if ok {
71+
stat.WrittenBytes += writtenBytes
72+
stat.WriteRequests += writeRequests
73+
} else {
74+
ss.scopedDFSStats[*scope] = &DFSStats{
75+
WrittenBytes: writtenBytes,
76+
WriteRequests: writeRequests,
77+
}
78+
}
79+
}
80+
}
81+
82+
// TakeScopedDFSStats takes the DFS stats info for each scope. It will return the DFS stats info and clear the internal map.
83+
func (ss *storeStats) TakeScopedDFSStats() map[pdpb.DfsStatScope]*DFSStats {
84+
ss.mu.Lock()
85+
defer ss.mu.Unlock()
86+
dfsStats := ss.scopedDFSStats
87+
ss.scopedDFSStats = make(map[pdpb.DfsStatScope]*DFSStats)
88+
return dfsStats
4989
}
5090

5191
// GetStoreStats returns the statistics information of the store.

pkg/core/store_stats_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,66 @@ func TestStoreStats(t *testing.T) {
4848
re.Greater(store.GetAvgAvailable(), uint64(150*units.GiB))
4949
re.Less(store.GetAvgAvailable(), uint64(160*units.GiB))
5050
}
51+
52+
func TestDFSStats(t *testing.T) {
53+
re := require.New(t)
54+
meta := &metapb.Store{Id: 1, State: metapb.StoreState_Up}
55+
storeStats := &pdpb.StoreStats{
56+
Capacity: uint64(200 * units.GiB),
57+
UsedSize: uint64(50 * units.GiB),
58+
Available: uint64(150 * units.GiB),
59+
}
60+
store := NewStoreInfo(meta, SetStoreStats(storeStats))
61+
scopedDFSStats := store.TakeScopedDFSStats()
62+
re.Empty(scopedDFSStats)
63+
64+
storeStats.Dfs = []*pdpb.DfsStatItem{
65+
{
66+
Scope: &pdpb.DfsStatScope{
67+
KeyspaceId: 1,
68+
Component: "test-component",
69+
},
70+
WrittenBytes: 100,
71+
WriteRequests: 100,
72+
},
73+
{
74+
Scope: &pdpb.DfsStatScope{
75+
KeyspaceId: 2,
76+
Component: "test-component",
77+
},
78+
WrittenBytes: 200,
79+
WriteRequests: 200,
80+
},
81+
}
82+
store = store.Clone(SetStoreStats(storeStats))
83+
scopedDFSStats = store.TakeScopedDFSStats()
84+
re.Len(scopedDFSStats, 2)
85+
re.Equal(storeStats.Dfs[0].WrittenBytes, scopedDFSStats[*storeStats.Dfs[0].Scope].WrittenBytes)
86+
re.Equal(storeStats.Dfs[0].WriteRequests, scopedDFSStats[*storeStats.Dfs[0].Scope].WriteRequests)
87+
re.Equal(storeStats.Dfs[1].WrittenBytes, scopedDFSStats[*storeStats.Dfs[1].Scope].WrittenBytes)
88+
re.Equal(storeStats.Dfs[1].WriteRequests, scopedDFSStats[*storeStats.Dfs[1].Scope].WriteRequests)
89+
90+
storeStats.Dfs = []*pdpb.DfsStatItem{
91+
{
92+
Scope: &pdpb.DfsStatScope{
93+
KeyspaceId: 1,
94+
Component: "test-component",
95+
},
96+
WrittenBytes: 100,
97+
WriteRequests: 100,
98+
},
99+
{
100+
Scope: &pdpb.DfsStatScope{
101+
KeyspaceId: 1,
102+
Component: "test-component",
103+
},
104+
WrittenBytes: 200,
105+
WriteRequests: 200,
106+
},
107+
}
108+
store = store.Clone(SetStoreStats(storeStats))
109+
scopedDFSStats = store.TakeScopedDFSStats()
110+
re.Len(scopedDFSStats, 1)
111+
re.Equal(storeStats.Dfs[0].WrittenBytes+storeStats.Dfs[1].WrittenBytes, scopedDFSStats[*storeStats.Dfs[0].Scope].WrittenBytes)
112+
re.Equal(storeStats.Dfs[0].WriteRequests+storeStats.Dfs[1].WriteRequests, scopedDFSStats[*storeStats.Dfs[0].Scope].WriteRequests)
113+
}

pkg/metering/utils.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
UnitRU = "RU"
2727
// UnitBytes is the unit of the metering bytes.
2828
UnitBytes = "Bytes"
29+
// UnitRequests is the unit of the metering requests.
30+
UnitRequests = "Requests"
2931

3032
// DataVersionField is the version field of the metering data.
3133
DataVersionField = "version"
@@ -44,3 +46,8 @@ func NewRUValue(value float64) common.MeteringValue {
4446
func NewBytesValue(value uint64) common.MeteringValue {
4547
return common.MeteringValue{Value: value, Unit: UnitBytes}
4648
}
49+
50+
// NewRequestsValue creates a new metering requests value.
51+
func NewRequestsValue(value uint64) common.MeteringValue {
52+
return common.MeteringValue{Value: value, Unit: UnitRequests}
53+
}

server/cluster/cluster.go

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ const (
111111
regionLabelGCInterval = time.Hour
112112
// storageSizeCollectorInterval is the interval to run storage size collector.
113113
storageSizeCollectorInterval = time.Minute
114+
// dfsStatsCollectorInterval is the interval to run store stats collector.
115+
dfsStatsCollectorInterval = time.Minute
114116

115117
// minSnapshotDurationSec is the minimum duration that a store can tolerate.
116118
// It should enlarge the limiter if the snapshot's duration is less than this value.
@@ -403,7 +405,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
403405
}
404406
}
405407
c.checkSchedulingService()
406-
c.wg.Add(11)
408+
c.wg.Add(12)
407409
go c.runServiceCheckJob()
408410
go c.runMetricsCollectionJob()
409411
go c.runNodeStateCheckJob()
@@ -415,6 +417,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
415417
go c.startGCTuner()
416418
go c.startProgressGC()
417419
go c.runStorageSizeCollector(s.GetMeteringWriter(), c.regionLabeler, s.GetKeyspaceManager())
420+
go c.runDFSStatsCollector(s.GetMeteringWriter(), s.GetKeyspaceManager())
418421

419422
c.running = true
420423
c.heartbeatRunner.Start(c.ctx)
@@ -2626,3 +2629,83 @@ func (c *RaftCluster) collectStorageSize(
26262629
zap.Int("count", len(storageSizeInfoList)))
26272630
return storageSizeInfoList
26282631
}
2632+
2633+
// runDFSStatsCollector runs the DFS (Distributed File System) stats collector for the metering.
2634+
func (c *RaftCluster) runDFSStatsCollector(
2635+
writer *metering.Writer,
2636+
keyspaceManager *keyspace.Manager,
2637+
) {
2638+
defer logutil.LogPanic()
2639+
defer c.wg.Done()
2640+
2641+
if writer == nil {
2642+
log.Info("no metering writer provided, the dfs stats collector will not be started")
2643+
return
2644+
}
2645+
log.Info("running the dfs stats collector")
2646+
// Init and register the collector before starting the loop.
2647+
collector := newDfsStatsCollector()
2648+
writer.RegisterCollector(collector)
2649+
// Start the ticker to collect the DFS stats data periodically.
2650+
ticker := time.NewTicker(dfsStatsCollectorInterval)
2651+
defer ticker.Stop()
2652+
2653+
for {
2654+
select {
2655+
case <-c.ctx.Done():
2656+
log.Info("dfs stats collector has been stopped")
2657+
return
2658+
case <-ticker.C:
2659+
keyspaceDFSStats := c.collectDfsStats(keyspaceManager)
2660+
collector.Collect(keyspaceDFSStats)
2661+
}
2662+
}
2663+
}
2664+
2665+
type keyspaceDFSStatsKey struct {
2666+
keyspaceName string
2667+
component string
2668+
}
2669+
2670+
type keyspaceDFSStatsMap map[keyspaceDFSStatsKey]*core.DFSStats
2671+
2672+
func (c *RaftCluster) collectDfsStats(keyspaceManager *keyspace.Manager) keyspaceDFSStatsMap {
2673+
var (
2674+
keyspaceDFSStats = make(keyspaceDFSStatsMap, 0)
2675+
keyspaceName string
2676+
err error
2677+
)
2678+
for _, store := range c.GetStores() {
2679+
scopedDFSStats := store.TakeScopedDFSStats()
2680+
if len(scopedDFSStats) == 0 {
2681+
continue
2682+
}
2683+
// Merge into the keyspace DFS stats.
2684+
for scope, stats := range scopedDFSStats {
2685+
// Set the keyspace name to empty string for global scope.
2686+
if scope.GetIsGlobal() {
2687+
keyspaceName = ""
2688+
} else {
2689+
keyspaceName, err = keyspaceManager.GetKeyspaceNameByID(scope.GetKeyspaceId())
2690+
if err != nil {
2691+
continue
2692+
}
2693+
}
2694+
key := keyspaceDFSStatsKey{
2695+
keyspaceName: keyspaceName,
2696+
component: scope.GetComponent(),
2697+
}
2698+
dfsStats, ok := keyspaceDFSStats[key]
2699+
if ok {
2700+
dfsStats.WrittenBytes += stats.WrittenBytes
2701+
dfsStats.WriteRequests += stats.WriteRequests
2702+
} else {
2703+
keyspaceDFSStats[key] = &core.DFSStats{
2704+
WrittenBytes: stats.WrittenBytes,
2705+
WriteRequests: stats.WriteRequests,
2706+
}
2707+
}
2708+
}
2709+
}
2710+
return keyspaceDFSStats
2711+
}

server/cluster/metering.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/pingcap/metering_sdk/common"
2323

24+
"github.com/tikv/pd/pkg/core"
2425
"github.com/tikv/pd/pkg/metering"
2526
)
2627

@@ -93,3 +94,57 @@ func (c *storageSizeCollector) Aggregate() []map[string]any {
9394
}
9495
return records
9596
}
97+
98+
const (
99+
dfsStatsCollectorCategory = "dfs-stats"
100+
dfsStatsMeteringVersion = "1"
101+
102+
meteringDataDfsWrittenBytes = "written_bytes"
103+
meteringDataDfsWriteRequests = "write_requests"
104+
)
105+
106+
var _ metering.Collector = (*dfsStatsCollector)(nil)
107+
108+
type dfsStatsCollector struct {
109+
sync.RWMutex
110+
keyspaceDfsStats map[keyspaceDFSStatsKey]*core.DFSStats
111+
}
112+
113+
func newDfsStatsCollector() *dfsStatsCollector {
114+
return &dfsStatsCollector{
115+
keyspaceDfsStats: make(keyspaceDFSStatsMap),
116+
}
117+
}
118+
119+
// Category returns the category of the collector.
120+
func (*dfsStatsCollector) Category() string { return dfsStatsCollectorCategory }
121+
122+
// Collect collects the DFS stats data.
123+
func (c *dfsStatsCollector) Collect(data any) {
124+
c.Lock()
125+
defer c.Unlock()
126+
keyspaceDFSStats := data.(keyspaceDFSStatsMap)
127+
for key, info := range keyspaceDFSStats {
128+
c.keyspaceDfsStats[key] = info
129+
}
130+
}
131+
132+
// Aggregate aggregates the DFS stats data.
133+
func (c *dfsStatsCollector) Aggregate() []map[string]any {
134+
c.Lock()
135+
keyspaceDfsStats := c.keyspaceDfsStats
136+
c.keyspaceDfsStats = make(keyspaceDFSStatsMap)
137+
c.Unlock()
138+
records := make([]map[string]any, 0, len(keyspaceDfsStats))
139+
for key, dfsStats := range keyspaceDfsStats {
140+
records = append(records, map[string]any{
141+
metering.DataVersionField: dfsStatsMeteringVersion,
142+
// keyspaceName is the logical cluster ID in the metering data. Empty string for global scope.
143+
metering.DataClusterIDField: key.keyspaceName,
144+
metering.DataSourceNameField: key.component,
145+
meteringDataDfsWrittenBytes: metering.NewBytesValue(dfsStats.WrittenBytes),
146+
meteringDataDfsWriteRequests: metering.NewRequestsValue(dfsStats.WriteRequests),
147+
})
148+
}
149+
return records
150+
}

0 commit comments

Comments
 (0)