Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions pkg/core/store_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,26 @@ import (
"github.com/tikv/pd/pkg/utils/typeutil"
)

// DFSStats is used to calculate the DFS stats info for each scope.
type DFSStats struct {
WrittenBytes uint64
WriteRequests uint64
}

type storeStats struct {
mu syncutil.RWMutex
rawStats *pdpb.StoreStats

// avgAvailable is used to make available smooth, aka no sudden changes.
avgAvailable *movingaverage.HMA
avgAvailable *movingaverage.HMA
scopedDFSStats map[pdpb.DfsStatScope]*DFSStats
}

func newStoreStats() *storeStats {
return &storeStats{
rawStats: &pdpb.StoreStats{},
avgAvailable: movingaverage.NewHMA(60), // take 10 minutes sample under 10s heartbeat rate
rawStats: &pdpb.StoreStats{},
avgAvailable: movingaverage.NewHMA(60), // take 10 minutes sample under 10s heartbeat rate
scopedDFSStats: make(map[pdpb.DfsStatScope]*DFSStats),
}
}

Expand All @@ -46,6 +54,38 @@ func (ss *storeStats) updateRawStats(rawStats *pdpb.StoreStats) {
return
}
ss.avgAvailable.Add(float64(rawStats.GetAvailable()))

dfsStatItems := rawStats.GetDfs()
if len(dfsStatItems) == 0 {
return
}
for _, dfsStat := range dfsStatItems {
scope := dfsStat.GetScope()
if scope == nil {
continue
}
writtenBytes := dfsStat.GetWrittenBytes()
writeRequests := dfsStat.GetWriteRequests()
stat, ok := ss.scopedDFSStats[*scope]
if ok {
stat.WrittenBytes += writtenBytes
stat.WriteRequests += writeRequests
} else {
ss.scopedDFSStats[*scope] = &DFSStats{
WrittenBytes: writtenBytes,
WriteRequests: writeRequests,
}
}
}
}

// TakeScopedDFSStats takes the DFS stats info for each scope. It will return the DFS stats info and clear the internal map.
func (ss *storeStats) TakeScopedDFSStats() map[pdpb.DfsStatScope]*DFSStats {
ss.mu.Lock()
defer ss.mu.Unlock()
dfsStats := ss.scopedDFSStats
ss.scopedDFSStats = make(map[pdpb.DfsStatScope]*DFSStats)
return dfsStats
}

// GetStoreStats returns the statistics information of the store.
Expand Down
63 changes: 63 additions & 0 deletions pkg/core/store_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,66 @@ func TestStoreStats(t *testing.T) {
re.Greater(store.GetAvgAvailable(), uint64(150*units.GiB))
re.Less(store.GetAvgAvailable(), uint64(160*units.GiB))
}

func TestDFSStats(t *testing.T) {
re := require.New(t)
meta := &metapb.Store{Id: 1, State: metapb.StoreState_Up}
storeStats := &pdpb.StoreStats{
Capacity: uint64(200 * units.GiB),
UsedSize: uint64(50 * units.GiB),
Available: uint64(150 * units.GiB),
}
store := NewStoreInfo(meta, SetStoreStats(storeStats))
scopedDFSStats := store.TakeScopedDFSStats()
re.Empty(scopedDFSStats)

storeStats.Dfs = []*pdpb.DfsStatItem{
{
Scope: &pdpb.DfsStatScope{
KeyspaceId: 1,
Component: "test-component",
},
WrittenBytes: 100,
WriteRequests: 100,
},
{
Scope: &pdpb.DfsStatScope{
KeyspaceId: 2,
Component: "test-component",
},
WrittenBytes: 200,
WriteRequests: 200,
},
}
store = store.Clone(SetStoreStats(storeStats))
scopedDFSStats = store.TakeScopedDFSStats()
re.Len(scopedDFSStats, 2)
re.Equal(storeStats.Dfs[0].WrittenBytes, scopedDFSStats[*storeStats.Dfs[0].Scope].WrittenBytes)
re.Equal(storeStats.Dfs[0].WriteRequests, scopedDFSStats[*storeStats.Dfs[0].Scope].WriteRequests)
re.Equal(storeStats.Dfs[1].WrittenBytes, scopedDFSStats[*storeStats.Dfs[1].Scope].WrittenBytes)
re.Equal(storeStats.Dfs[1].WriteRequests, scopedDFSStats[*storeStats.Dfs[1].Scope].WriteRequests)

storeStats.Dfs = []*pdpb.DfsStatItem{
{
Scope: &pdpb.DfsStatScope{
KeyspaceId: 1,
Component: "test-component",
},
WrittenBytes: 100,
WriteRequests: 100,
},
{
Scope: &pdpb.DfsStatScope{
KeyspaceId: 1,
Component: "test-component",
},
WrittenBytes: 200,
WriteRequests: 200,
},
}
store = store.Clone(SetStoreStats(storeStats))
scopedDFSStats = store.TakeScopedDFSStats()
re.Len(scopedDFSStats, 1)
re.Equal(storeStats.Dfs[0].WrittenBytes+storeStats.Dfs[1].WrittenBytes, scopedDFSStats[*storeStats.Dfs[0].Scope].WrittenBytes)
re.Equal(storeStats.Dfs[0].WriteRequests+storeStats.Dfs[1].WriteRequests, scopedDFSStats[*storeStats.Dfs[0].Scope].WriteRequests)
}
7 changes: 7 additions & 0 deletions pkg/metering/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
UnitRU = "RU"
// UnitBytes is the unit of the metering bytes.
UnitBytes = "Bytes"
// UnitRequests is the unit of the metering requests.
UnitRequests = "Requests"

// DataVersionField is the version field of the metering data.
DataVersionField = "version"
Expand All @@ -44,3 +46,8 @@ func NewRUValue(value float64) common.MeteringValue {
func NewBytesValue(value uint64) common.MeteringValue {
return common.MeteringValue{Value: value, Unit: UnitBytes}
}

// NewRequestsValue creates a new metering requests value.
func NewRequestsValue(value uint64) common.MeteringValue {
return common.MeteringValue{Value: value, Unit: UnitRequests}
}
85 changes: 84 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const (
regionLabelGCInterval = time.Hour
// storageSizeCollectorInterval is the interval to run storage size collector.
storageSizeCollectorInterval = time.Minute
// dfsStatsCollectorInterval is the interval to run store stats collector.
dfsStatsCollectorInterval = time.Minute

// minSnapshotDurationSec is the minimum duration that a store can tolerate.
// It should enlarge the limiter if the snapshot's duration is less than this value.
Expand Down Expand Up @@ -403,7 +405,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}
}
c.checkSchedulingService()
c.wg.Add(11)
c.wg.Add(12)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
go c.runNodeStateCheckJob()
Expand All @@ -415,6 +417,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
go c.startGCTuner()
go c.startProgressGC()
go c.runStorageSizeCollector(s.GetMeteringWriter(), c.regionLabeler, s.GetKeyspaceManager())
go c.runDFSStatsCollector(s.GetMeteringWriter(), s.GetKeyspaceManager())

c.running = true
c.heartbeatRunner.Start(c.ctx)
Expand Down Expand Up @@ -2626,3 +2629,83 @@ func (c *RaftCluster) collectStorageSize(
zap.Int("count", len(storageSizeInfoList)))
return storageSizeInfoList
}

// runDFSStatsCollector runs the DFS (Distributed File System) stats collector for the metering.
func (c *RaftCluster) runDFSStatsCollector(
writer *metering.Writer,
keyspaceManager *keyspace.Manager,
) {
defer logutil.LogPanic()
defer c.wg.Done()

if writer == nil {
log.Info("no metering writer provided, the dfs stats collector will not be started")
return
}
log.Info("running the dfs stats collector")
// Init and register the collector before starting the loop.
collector := newDfsStatsCollector()
writer.RegisterCollector(collector)
// Start the ticker to collect the DFS stats data periodically.
ticker := time.NewTicker(dfsStatsCollectorInterval)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("dfs stats collector has been stopped")
return
case <-ticker.C:
keyspaceDFSStats := c.collectDfsStats(keyspaceManager)
collector.Collect(keyspaceDFSStats)
}
}
}

type keyspaceDFSStatsKey struct {
keyspaceName string
component string
}

type keyspaceDFSStatsMap map[keyspaceDFSStatsKey]*core.DFSStats

func (c *RaftCluster) collectDfsStats(keyspaceManager *keyspace.Manager) keyspaceDFSStatsMap {
var (
keyspaceDFSStats = make(keyspaceDFSStatsMap, 0)
keyspaceName string
err error
)
for _, store := range c.GetStores() {
scopedDFSStats := store.TakeScopedDFSStats()
if len(scopedDFSStats) == 0 {
continue
}
// Merge into the keyspace DFS stats.
for scope, stats := range scopedDFSStats {
// Set the keyspace name to empty string for global scope.
if scope.GetIsGlobal() {
keyspaceName = ""
} else {
keyspaceName, err = keyspaceManager.GetKeyspaceNameByID(scope.GetKeyspaceId())
if err != nil {
continue
}
}
key := keyspaceDFSStatsKey{
keyspaceName: keyspaceName,
component: scope.GetComponent(),
}
dfsStats, ok := keyspaceDFSStats[key]
if ok {
dfsStats.WrittenBytes += stats.WrittenBytes
dfsStats.WriteRequests += stats.WriteRequests
} else {
keyspaceDFSStats[key] = &core.DFSStats{
WrittenBytes: stats.WrittenBytes,
WriteRequests: stats.WriteRequests,
}
}
}
}
return keyspaceDFSStats
}
55 changes: 55 additions & 0 deletions server/cluster/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/metering_sdk/common"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/metering"
)

Expand Down Expand Up @@ -93,3 +94,57 @@ func (c *storageSizeCollector) Aggregate() []map[string]any {
}
return records
}

const (
dfsStatsCollectorCategory = "dfs-stats"
dfsStatsMeteringVersion = "1"

meteringDataDfsWrittenBytes = "written_bytes"
meteringDataDfsWriteRequests = "write_requests"
)

var _ metering.Collector = (*dfsStatsCollector)(nil)

type dfsStatsCollector struct {
sync.RWMutex
keyspaceDfsStats map[keyspaceDFSStatsKey]*core.DFSStats
}

func newDfsStatsCollector() *dfsStatsCollector {
return &dfsStatsCollector{
keyspaceDfsStats: make(keyspaceDFSStatsMap),
}
}

// Category returns the category of the collector.
func (*dfsStatsCollector) Category() string { return dfsStatsCollectorCategory }

// Collect collects the DFS stats data.
func (c *dfsStatsCollector) Collect(data any) {
c.Lock()
defer c.Unlock()
keyspaceDFSStats := data.(keyspaceDFSStatsMap)
for key, info := range keyspaceDFSStats {
c.keyspaceDfsStats[key] = info
}
}

// Aggregate aggregates the DFS stats data.
func (c *dfsStatsCollector) Aggregate() []map[string]any {
c.Lock()
keyspaceDfsStats := c.keyspaceDfsStats
c.keyspaceDfsStats = make(keyspaceDFSStatsMap)
c.Unlock()
records := make([]map[string]any, 0, len(keyspaceDfsStats))
for key, dfsStats := range keyspaceDfsStats {
records = append(records, map[string]any{
metering.DataVersionField: dfsStatsMeteringVersion,
// keyspaceName is the logical cluster ID in the metering data. Empty string for global scope.
metering.DataClusterIDField: key.keyspaceName,
metering.DataSourceNameField: key.component,
meteringDataDfsWrittenBytes: metering.NewBytesValue(dfsStats.WrittenBytes),
meteringDataDfsWriteRequests: metering.NewRequestsValue(dfsStats.WriteRequests),
})
}
return records
}
Loading