@@ -18,6 +18,7 @@ import (
1818 "context"
1919 "encoding/json"
2020 "math"
21+ "os"
2122 "sort"
2223 "strings"
2324 "time"
@@ -28,6 +29,7 @@ import (
2829 rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
2930 "github.com/pingcap/log"
3031 "github.com/prometheus/client_golang/prometheus"
32+ "github.com/prometheus/client_golang/prometheus/push"
3133 bs "github.com/tikv/pd/pkg/basicserver"
3234 "github.com/tikv/pd/pkg/errs"
3335 "github.com/tikv/pd/pkg/storage/endpoint"
@@ -48,6 +50,8 @@ const (
4850
4951 reservedDefaultGroupName = "default"
5052 middlePriority = 8
53+
54+ pushMetricsTimeout = 10 * time .Second
5155)
5256
5357// Manager is the manager of resource group.
@@ -179,7 +183,7 @@ func (m *Manager) Init(ctx context.Context) error {
179183 }
180184
181185 // Start the background metrics flusher.
182- go m .backgroundMetricsFlush (ctx )
186+ go m .backgroundMetricsFlush (ctx , m . controllerConfig . PushMetricsAddress , m . controllerConfig . PushMetricsInterval . Duration )
183187 go func () {
184188 defer logutil .LogPanic ()
185189 m .persistLoop (ctx )
@@ -357,7 +361,7 @@ func (m *Manager) persistResourceGroupRunningState() {
357361}
358362
359363// Receive the consumption and flush it to the metrics.
360- func (m * Manager ) backgroundMetricsFlush (ctx context.Context ) {
364+ func (m * Manager ) backgroundMetricsFlush (ctx context.Context , pushMetricsAddr string , pushMetricsInterval time. Duration ) {
361365 defer logutil .LogPanic ()
362366 cleanUpTicker := time .NewTicker (metricsCleanupInterval )
363367 defer cleanUpTicker .Stop ()
@@ -366,6 +370,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
366370 recordMaxTicker := time .NewTicker (tickPerSecond )
367371 defer recordMaxTicker .Stop ()
368372 maxPerSecTrackers := make (map [string ]* maxPerSecCostTracker )
373+
374+ pushMetricsTickerC := make (<- chan time.Time )
375+ if pushMetricsAddr != "" && pushMetricsInterval .Seconds () > 0 {
376+ pushMetricsTicker := time .NewTicker (pushMetricsInterval )
377+ pushMetricsTickerC = pushMetricsTicker .C
378+ defer pushMetricsTicker .Stop ()
379+ }
369380 for {
370381 select {
371382 case <- ctx .Done ():
@@ -495,6 +506,25 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
495506 t .FlushMetrics ()
496507 }
497508 }
509+
510+ case <- pushMetricsTickerC :
511+ podName := os .Getenv ("HOSTNAME" )
512+ if podName == "" {
513+ podName = "default"
514+ }
515+ pushCtx , cancel := context .WithTimeout (ctx , pushMetricsTimeout )
516+ start := time .Now ()
517+ err := push .New (pushMetricsAddr , "resource_group_svc" ).
518+ Grouping ("pod" , podName ).
519+ Collector (readRequestUnitCost ).
520+ Collector (writeRequestUnitCost ).
521+ Collector (sqlLayerRequestUnitCost ).
522+ PushContext (pushCtx )
523+ cancel ()
524+ if err != nil {
525+ log .Error ("push metrics to Prometheus failed" , zap .Error (err ))
526+ }
527+ pushRUMetricsDuration .Observe (time .Since (start ).Seconds ())
498528 }
499529 }
500530}
0 commit comments