Skip to content

Commit b0a3c90

Browse files
authored
server, schedule: Add observability for PD API leader-ready and cluster startup paths. (#10523)
ref #10516 Add observability for PD API leader-ready and cluster startup paths. - add startup duration metrics for raft cluster and region labeler creation - add leader-ready phase logs in server and cluster startup flow - keep current master keyspace-group, runner, and PromoteSelf logic while porting the observability changes Signed-off-by: bufferflies <1045931706@qq.com> Signed-off-by: tongjian <1045931706@qq.com>
1 parent b21a183 commit b0a3c90

File tree

5 files changed

+124
-20
lines changed

5 files changed

+124
-20
lines changed

pkg/schedule/labeler/labeler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ type RegionLabeler struct {
4444

4545
// NewRegionLabeler creates a Labeler instance.
4646
func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInterval time.Duration) (*RegionLabeler, error) {
47+
start := time.Now()
48+
defer func() {
49+
newRegionLabelerDuration.Observe(time.Since(start).Seconds())
50+
}()
51+
4752
l := &RegionLabeler{
4853
storage: storage,
4954
labelRules: make(map[string]*LabelRule),
@@ -54,6 +59,7 @@ func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInter
5459
if err := l.loadRules(); err != nil {
5560
return nil, err
5661
}
62+
log.Info("new region labeler created", zap.Int("label-rules-count", len(l.labelRules)))
5763
go l.doGC(gcInterval)
5864
return l, nil
5965
}

pkg/schedule/labeler/metrics.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,27 @@ package labeler
1616

1717
import "github.com/prometheus/client_golang/prometheus"
1818

19-
// LabelerEventCounter is a counter of the scheduler labeler system.
20-
var LabelerEventCounter = prometheus.NewCounterVec(
21-
prometheus.CounterOpts{
22-
Namespace: "pd",
23-
Subsystem: "schedule",
24-
Name: "labeler_event_counter",
25-
Help: "Counter of the scheduler label.",
26-
}, []string{"type", "event"})
19+
var (
20+
// LabelerEventCounter is a counter of the scheduler labeler system.
21+
LabelerEventCounter = prometheus.NewCounterVec(
22+
prometheus.CounterOpts{
23+
Namespace: "pd",
24+
Subsystem: "schedule",
25+
Name: "labeler_event_counter",
26+
Help: "Counter of the scheduler label.",
27+
}, []string{"type", "event"})
28+
29+
newRegionLabelerDuration = prometheus.NewHistogram(
30+
prometheus.HistogramOpts{
31+
Namespace: "pd",
32+
Subsystem: "schedule",
33+
Name: "new_region_labeler_duration_seconds",
34+
Help: "Bucketed histogram of processing time (s) of creating new region labeler.",
35+
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 4s
36+
})
37+
)
2738

2839
func init() {
2940
prometheus.MustRegister(LabelerEventCounter)
41+
prometheus.MustRegister(newRegionLabelerDuration)
3042
}

server/cluster/cluster.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,15 @@ func (c *RaftCluster) InitCluster(
334334

335335
// Start starts a cluster.
336336
func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
337+
start := time.Now()
338+
defer func() {
339+
startType := "non-bootstrap"
340+
if bootstrap {
341+
startType = "bootstrap"
342+
}
343+
raftClusterStartDuration.WithLabelValues(startType).Observe(time.Since(start).Seconds())
344+
}()
345+
337346
c.Lock()
338347
defer c.Unlock()
339348

@@ -342,15 +351,22 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
342351
return nil
343352
}
344353
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
354+
initClusterStart := time.Now()
345355
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
346356
if err != nil {
357+
log.Warn("failed to initialize cluster", errs.ZapError(err), zap.Duration("cost", time.Since(initClusterStart)))
347358
return err
348359
}
360+
initClusterDuration := time.Since(initClusterStart)
361+
log.Info("initialize cluster completed", zap.Duration("cost", initClusterDuration))
349362
// We should not manage tso service when bootstrap try to start raft cluster.
350363
// It only is controlled by leader election.
351364
// Ref: https://github.com/tikv/pd/issues/8836
352365
if !bootstrap {
366+
checkTSOStart := time.Now()
353367
c.checkTSOService()
368+
checkTSODuration := time.Since(checkTSOStart)
369+
log.Info("check TSO service completed", zap.Duration("cost", checkTSODuration))
354370
}
355371
defer func() {
356372
if !bootstrap && err != nil {
@@ -365,25 +381,36 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
365381
}
366382
failpoint.Return(err)
367383
})
384+
loadClusterInfoStart := time.Now()
368385
cluster, err := c.LoadClusterInfo()
369386
if err != nil {
387+
log.Warn("failed to load cluster info", errs.ZapError(err), zap.Duration("cost", time.Since(loadClusterInfoStart)))
370388
return err
371389
}
372390
if cluster == nil {
373-
log.Warn("cluster is not bootstrapped")
391+
loadClusterInfoDuration := time.Since(loadClusterInfoStart)
392+
log.Warn("cluster is not bootstrapped", zap.Duration("cost", loadClusterInfoDuration))
374393
return nil
375394
}
376395
if c.opt.IsPlacementRulesEnabled() {
396+
ruleInitStart := time.Now()
377397
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel(), false)
378398
if err != nil {
399+
log.Warn("failed to initialize placement rules", errs.ZapError(err), zap.Duration("cost", time.Since(ruleInitStart)))
379400
return err
380401
}
402+
log.Info("initialize placement rules completed", zap.Duration("cost", time.Since(ruleInitStart)))
381403
}
382-
404+
loadClusterInfoDuration := time.Since(loadClusterInfoStart)
405+
log.Info("load cluster info completed", zap.Duration("cost", loadClusterInfoDuration))
406+
labelerStart := time.Now()
383407
c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
408+
labelerDuration := time.Since(labelerStart)
384409
if err != nil {
410+
log.Warn("region labeler creation failed", zap.Error(err), zap.Duration("cost", labelerDuration))
385411
return err
386412
}
413+
log.Info("region labeler created", zap.Duration("cost", labelerDuration))
387414

388415
// create affinity manager with region labeler for key range validation and rebuild
389416
c.affinityManager, err = affinity.NewManager(c.ctx, c.storage, c, c.GetOpts(), c.regionLabeler)
@@ -392,27 +419,45 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
392419
}
393420

394421
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
422+
observeSlowStoreStart := time.Now()
395423
for _, store := range c.GetStores() {
396424
storeID := store.GetID()
397425
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
398426
}
427+
log.Info("observe slow store status completed", zap.Duration("cost", time.Since(observeSlowStoreStart)))
399428
}
429+
replicationModeStart := time.Now()
400430
c.replicationMode, err = replication.NewReplicationModeManager(s.GetConfig().ReplicationMode, c.storage, cluster, s)
401431
if err != nil {
432+
log.Warn("failed to create replication mode manager", errs.ZapError(err), zap.Duration("cost", time.Since(replicationModeStart)))
402433
return err
403434
}
435+
replicationModeDuration := time.Since(replicationModeStart)
436+
log.Info("create replication mode manager completed", zap.Duration("cost", replicationModeDuration))
437+
loadExternalTSStart := time.Now()
404438
c.loadExternalTS()
439+
log.Info("load external timestamp completed", zap.Duration("cost", time.Since(loadExternalTSStart)))
440+
loadMinResolvedTSStart := time.Now()
405441
c.loadMinResolvedTS()
442+
log.Info("load min resolved ts completed", zap.Duration("cost", time.Since(loadMinResolvedTSStart)))
406443

407444
if c.isKeyspaceGroupEnabled {
408445
// bootstrap keyspace group manager after starting other parts successfully.
409446
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
447+
log.Info("start to bootstrap keyspace group manager")
448+
bootstrapKeyspaceStart := time.Now()
410449
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
411450
if err != nil {
451+
log.Warn("failed to bootstrap keyspace group manager", errs.ZapError(err), zap.Duration("cost", time.Since(bootstrapKeyspaceStart)))
412452
return err
413453
}
454+
log.Info("bootstrap keyspace group manager completed", zap.Duration("cost", time.Since(bootstrapKeyspaceStart)))
414455
}
456+
checkSchedulingStart := time.Now()
415457
c.checkSchedulingService()
458+
checkSchedulingDuration := time.Since(checkSchedulingStart)
459+
log.Info("check scheduling service completed", zap.Duration("cost", checkSchedulingDuration))
460+
backgroundJobsStart := time.Now()
416461
c.wg.Add(11)
417462
go c.runServiceCheckJob()
418463
go c.runMetricsCollectionJob()
@@ -425,12 +470,14 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
425470
go c.startGCTuner()
426471
go c.startProgressGC()
427472
go c.runStorageSizeCollector(s.GetMeteringWriter(), c.regionLabeler, s.GetKeyspaceManager())
428-
473+
log.Info("start background jobs completed", zap.Duration("cost", time.Since(backgroundJobsStart)))
474+
runnersStart := time.Now()
429475
c.running = true
430476
c.heartbeatRunner.Start(c.ctx)
431477
c.miscRunner.Start(c.ctx)
432478
c.logRunner.Start(c.ctx)
433479
c.syncRegionRunner.Start(c.ctx)
480+
log.Info("start runners completed", zap.Duration("cost", time.Since(runnersStart)))
434481
return nil
435482
}
436483

server/cluster/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ var (
7171
Name: "store_trigger_network_slow_evict",
7272
Help: "The count of store trigger network slow evict",
7373
}, []string{"store"})
74+
75+
raftClusterStartDuration = prometheus.NewHistogramVec(
76+
prometheus.HistogramOpts{
77+
Namespace: "pd",
78+
Subsystem: "cluster",
79+
Name: "raftcluster_start_duration_seconds",
80+
Help: "Bucketed histogram of processing time (s) of raft cluster start.",
81+
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 4s
82+
}, []string{"type"})
7483
)
7584

7685
func init() {
@@ -81,4 +90,5 @@ func init() {
8190
prometheus.MustRegister(storeSyncConfigEvent)
8291
prometheus.MustRegister(updateStoreStatsGauge)
8392
prometheus.MustRegister(storeTriggerNetworkSlowEvict)
93+
prometheus.MustRegister(raftClusterStartDuration)
8494
}

server/server.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,6 +1748,9 @@ func (s *Server) campaignLeader() {
17481748
}
17491749
return
17501750
}
1751+
// Start timing from when leader is successfully elected
1752+
leaderReadyStart := time.Now()
1753+
log.Info("leader election succeeded, start leader ready process")
17511754

17521755
// Start keepalive the leadership and enable TSO service.
17531756
// TSO service is strictly enabled/disabled by PD leader lease for 2 reasons:
@@ -1761,48 +1764,72 @@ func (s *Server) campaignLeader() {
17611764
})
17621765

17631766
// maintain the PD leadership, after this, TSO can be service.
1767+
log.Info("start to keep leader lease")
1768+
keepLeaderStart := time.Now()
17641769
s.member.GetLeadership().Keep(ctx)
1765-
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))
1770+
keepLeaderDuration := time.Since(keepLeaderStart)
1771+
log.Info("keep leader lease completed", zap.Duration("cost", keepLeaderDuration), zap.String("campaign-leader-name", s.Name()))
17661772

1773+
reloadConfigStart := time.Now()
17671774
if err := s.reloadConfigFromKV(); err != nil {
1768-
log.Error("failed to reload configuration", errs.ZapError(err))
1775+
log.Warn("failed to reload configuration", errs.ZapError(err), zap.Duration("cost", time.Since(reloadConfigStart)))
17691776
return
17701777
}
1778+
reloadConfigDuration := time.Since(reloadConfigStart)
1779+
log.Info("reload config from KV completed", zap.Duration("cost", reloadConfigDuration))
17711780

1781+
loadTTLStart := time.Now()
17721782
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
1773-
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
1783+
log.Warn("failed to load persistOptions from etcd", errs.ZapError(err), zap.Duration("cost", time.Since(loadTTLStart)))
17741784
return
17751785
}
1786+
loadTTLDuration := time.Since(loadTTLStart)
1787+
log.Info("load persist options from etcd completed", zap.Duration("cost", loadTTLDuration))
17761788

1789+
encryptionStart := time.Now()
17771790
if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil {
1778-
log.Error("failed to initialize encryption", errs.ZapError(err))
1791+
log.Warn("failed to initialize encryption", errs.ZapError(err), zap.Duration("cost", time.Since(encryptionStart)))
17791792
return
17801793
}
1794+
encryptionDuration := time.Since(encryptionStart)
1795+
log.Info("initialize encryption completed", zap.Duration("cost", encryptionDuration))
17811796

1797+
callbacksStart := time.Now()
17821798
log.Info("triggering the leader callback functions")
17831799
for _, cb := range s.leaderCallbacks {
17841800
if err := cb(ctx); err != nil {
1785-
log.Error("failed to execute leader callback function", errs.ZapError(err))
1801+
log.Warn("failed to execute leader callback function", errs.ZapError(err), zap.Duration("cost", time.Since(callbacksStart)))
17861802
return
17871803
}
17881804
}
1805+
callbacksDuration := time.Since(callbacksStart)
1806+
log.Info("trigger leader callback functions completed", zap.Duration("cost", callbacksDuration))
17891807

17901808
// Try to create raft cluster.
1809+
createRaftClusterStart := time.Now()
17911810
if err := s.createRaftCluster(); err != nil {
1792-
log.Error("failed to create raft cluster", errs.ZapError(err))
1811+
log.Warn("failed to create raft cluster", errs.ZapError(err), zap.Duration("cost", time.Since(createRaftClusterStart)))
17931812
return
17941813
}
1814+
createRaftClusterDuration := time.Since(createRaftClusterStart)
1815+
log.Info("create raft cluster completed", zap.Duration("cost", createRaftClusterDuration))
17951816
defer s.stopRaftCluster()
17961817
failpoint.Inject("rebaseErr", func() {
17971818
failpoint.Return()
17981819
})
1820+
rebaseStart := time.Now()
17991821
if err := s.idAllocator.Rebase(); err != nil {
1800-
log.Error("failed to sync id from etcd", errs.ZapError(err))
1822+
log.Warn("failed to sync id from etcd", errs.ZapError(err), zap.Duration("cost", time.Since(rebaseStart)))
18011823
return
18021824
}
1825+
rebaseDuration := time.Since(rebaseStart)
1826+
log.Info("sync id from etcd completed", zap.Duration("cost", rebaseDuration))
18031827
// PromoteSelf to accept the remaining service, such as GetStore, GetRegion.
1828+
enableLeaderStart := time.Now()
18041829
s.member.PromoteSelf()
1830+
enableLeaderDuration := time.Since(enableLeaderStart)
18051831
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
1832+
totalDuration := time.Since(leaderReadyStart)
18061833
defer resetLeaderOnce.Do(func() {
18071834
// as soon as cancel the leadership keepalive, then other member have chance
18081835
// to be new leader.
@@ -1812,8 +1839,10 @@ func (s *Server) campaignLeader() {
18121839
})
18131840

18141841
CheckPDVersionWithClusterVersion(s.persistOptions)
1815-
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))
1816-
1842+
log.Info("PD leader is ready to serve",
1843+
zap.String("leader-name", s.Name()),
1844+
zap.Duration("total-cost", totalDuration),
1845+
zap.Duration("cost", enableLeaderDuration))
18171846
leaderTicker := time.NewTicker(mcs.LeaderTickInterval)
18181847
defer leaderTicker.Stop()
18191848

0 commit comments

Comments
 (0)