Skip to content
6 changes: 6 additions & 0 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type RegionLabeler struct {

// NewRegionLabeler creates a Labeler instance.
func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInterval time.Duration) (*RegionLabeler, error) {
start := time.Now()
defer func() {
newRegionLabelerDuration.Observe(time.Since(start).Seconds())
}()

l := &RegionLabeler{
storage: storage,
labelRules: make(map[string]*LabelRule),
Expand All @@ -54,6 +59,7 @@ func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInter
if err := l.loadRules(); err != nil {
return nil, err
}
log.Info("new region labeler created", zap.Int("label-rules-count", len(l.labelRules)))
go l.doGC(gcInterval)
return l, nil
}
Expand Down
28 changes: 20 additions & 8 deletions pkg/schedule/labeler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,27 @@ package labeler

import "github.com/prometheus/client_golang/prometheus"

// LabelerEventCounter is a counter of the scheduler labeler system.
var LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})
var (
// LabelerEventCounter is a counter of the scheduler labeler system.
LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})

newRegionLabelerDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "new_region_labeler_duration_seconds",
Help: "Bucketed histogram of processing time (s) of creating new region labeler.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 4s
})
)

func init() {
prometheus.MustRegister(LabelerEventCounter)
prometheus.MustRegister(newRegionLabelerDuration)
}
53 changes: 50 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ func (c *RaftCluster) InitCluster(

// Start starts a cluster.
func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
start := time.Now()
defer func() {
startType := "non-bootstrap"
if bootstrap {
startType = "bootstrap"
}
raftClusterStartDuration.WithLabelValues(startType).Observe(time.Since(start).Seconds())
}()

c.Lock()
defer c.Unlock()

Expand All @@ -335,15 +344,22 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
return nil
}
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
initClusterStart := time.Now()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
log.Warn("failed to initialize cluster", errs.ZapError(err), zap.Duration("cost", time.Since(initClusterStart)))
return err
}
initClusterDuration := time.Since(initClusterStart)
log.Info("initialize cluster completed", zap.Duration("cost", initClusterDuration))
// We should not manage tso service when bootstrap try to start raft cluster.
// It only is controlled by leader election.
// Ref: https://github.com/tikv/pd/issues/8836
if !bootstrap {
checkTSOStart := time.Now()
c.checkTSOService()
checkTSODuration := time.Since(checkTSOStart)
log.Info("check TSO service completed", zap.Duration("cost", checkTSODuration))
}
defer func() {
if !bootstrap && err != nil {
Expand All @@ -358,25 +374,36 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}
failpoint.Return(err)
})
loadClusterInfoStart := time.Now()
cluster, err := c.LoadClusterInfo()
if err != nil {
log.Warn("failed to load cluster info", errs.ZapError(err), zap.Duration("cost", time.Since(loadClusterInfoStart)))
return err
}
if cluster == nil {
log.Warn("cluster is not bootstrapped")
loadClusterInfoDuration := time.Since(loadClusterInfoStart)
log.Warn("cluster is not bootstrapped", zap.Duration("cost", loadClusterInfoDuration))
return nil
}
if c.opt.IsPlacementRulesEnabled() {
ruleInitStart := time.Now()
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel(), false)
if err != nil {
log.Warn("failed to initialize placement rules", errs.ZapError(err), zap.Duration("cost", time.Since(ruleInitStart)))
return err
}
log.Info("initialize placement rules completed", zap.Duration("cost", time.Since(ruleInitStart)))
}

loadClusterInfoDuration := time.Since(loadClusterInfoStart)
log.Info("load cluster info completed", zap.Duration("cost", loadClusterInfoDuration))
labelerStart := time.Now()
c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
labelerDuration := time.Since(labelerStart)
if err != nil {
log.Warn("region labeler creation failed", zap.Error(err), zap.Duration("cost", labelerDuration))
return err
}
log.Info("region labeler created", zap.Duration("cost", labelerDuration))

// create affinity manager with region labeler for key range validation and rebuild
c.affinityManager, err = affinity.NewManager(c.ctx, c.storage, c, c.GetOpts(), c.regionLabeler)
Expand All @@ -385,27 +412,45 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}

if !c.IsServiceIndependent(constant.SchedulingServiceName) {
observeSlowStoreStart := time.Now()
for _, store := range c.GetStores() {
storeID := store.GetID()
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
}
log.Info("observe slow store status completed", zap.Duration("cost", time.Since(observeSlowStoreStart)))
}
replicationModeStart := time.Now()
c.replicationMode, err = replication.NewReplicationModeManager(s.GetConfig().ReplicationMode, c.storage, cluster, s)
if err != nil {
log.Warn("failed to create replication mode manager", errs.ZapError(err), zap.Duration("cost", time.Since(replicationModeStart)))
return err
}
replicationModeDuration := time.Since(replicationModeStart)
log.Info("create replication mode manager completed", zap.Duration("cost", replicationModeDuration))
loadExternalTSStart := time.Now()
c.loadExternalTS()
log.Info("load external timestamp completed", zap.Duration("cost", time.Since(loadExternalTSStart)))
loadMinResolvedTSStart := time.Now()
c.loadMinResolvedTS()
log.Info("load min resolved ts completed", zap.Duration("cost", time.Since(loadMinResolvedTSStart)))

if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
log.Info("start to bootstrap keyspace group manager")
bootstrapKeyspaceStart := time.Now()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
log.Warn("failed to bootstrap keyspace group manager", errs.ZapError(err), zap.Duration("cost", time.Since(bootstrapKeyspaceStart)))
return err
}
log.Info("bootstrap keyspace group manager completed", zap.Duration("cost", time.Since(bootstrapKeyspaceStart)))
}
checkSchedulingStart := time.Now()
c.checkSchedulingService()
checkSchedulingDuration := time.Since(checkSchedulingStart)
log.Info("check scheduling service completed", zap.Duration("cost", checkSchedulingDuration))
backgroundJobsStart := time.Now()
c.wg.Add(11)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -418,12 +463,14 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
go c.startGCTuner()
go c.startProgressGC()
go c.runStorageSizeCollector(s.GetMeteringWriter(), c.regionLabeler, s.GetKeyspaceManager())

log.Info("start background jobs completed", zap.Duration("cost", time.Since(backgroundJobsStart)))
runnersStart := time.Now()
c.running = true
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
c.syncRegionRunner.Start(c.ctx)
log.Info("start runners completed", zap.Duration("cost", time.Since(runnersStart)))
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ var (
Name: "store_trigger_network_slow_evict",
Help: "The count of store trigger network slow evict",
}, []string{"store"})

raftClusterStartDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "cluster",
Name: "raftcluster_start_duration_seconds",
Help: "Bucketed histogram of processing time (s) of raft cluster start.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 4s
}, []string{"type"})
)

func init() {
Expand All @@ -81,4 +90,5 @@ func init() {
prometheus.MustRegister(storeSyncConfigEvent)
prometheus.MustRegister(updateStoreStatsGauge)
prometheus.MustRegister(storeTriggerNetworkSlowEvict)
prometheus.MustRegister(raftClusterStartDuration)
}
47 changes: 38 additions & 9 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,9 @@ func (s *Server) campaignLeader() {
}
return
}
// Start timing from when leader is successfully elected
leaderReadyStart := time.Now()
log.Info("leader election succeeded, start leader ready process")

// Start keepalive the leadership and enable TSO service.
// TSO service is strictly enabled/disabled by PD leader lease for 2 reasons:
Expand All @@ -1761,48 +1764,72 @@ func (s *Server) campaignLeader() {
})

// maintain the PD leadership, after this, TSO can be service.
log.Info("start to keep leader lease")
keepLeaderStart := time.Now()
s.member.GetLeadership().Keep(ctx)
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))
keepLeaderDuration := time.Since(keepLeaderStart)
log.Info("keep leader lease completed", zap.Duration("cost", keepLeaderDuration), zap.String("campaign-leader-name", s.Name()))

reloadConfigStart := time.Now()
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
log.Warn("failed to reload configuration", errs.ZapError(err), zap.Duration("cost", time.Since(reloadConfigStart)))
return
}
reloadConfigDuration := time.Since(reloadConfigStart)
log.Info("reload config from KV completed", zap.Duration("cost", reloadConfigDuration))

loadTTLStart := time.Now()
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
log.Warn("failed to load persistOptions from etcd", errs.ZapError(err), zap.Duration("cost", time.Since(loadTTLStart)))
return
}
loadTTLDuration := time.Since(loadTTLStart)
log.Info("load persist options from etcd completed", zap.Duration("cost", loadTTLDuration))

encryptionStart := time.Now()
if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil {
log.Error("failed to initialize encryption", errs.ZapError(err))
log.Warn("failed to initialize encryption", errs.ZapError(err), zap.Duration("cost", time.Since(encryptionStart)))
return
}
encryptionDuration := time.Since(encryptionStart)
log.Info("initialize encryption completed", zap.Duration("cost", encryptionDuration))

callbacksStart := time.Now()
log.Info("triggering the leader callback functions")
for _, cb := range s.leaderCallbacks {
if err := cb(ctx); err != nil {
log.Error("failed to execute leader callback function", errs.ZapError(err))
log.Warn("failed to execute leader callback function", errs.ZapError(err), zap.Duration("cost", time.Since(callbacksStart)))
return
}
}
callbacksDuration := time.Since(callbacksStart)
log.Info("trigger leader callback functions completed", zap.Duration("cost", callbacksDuration))

// Try to create raft cluster.
createRaftClusterStart := time.Now()
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", errs.ZapError(err))
log.Warn("failed to create raft cluster", errs.ZapError(err), zap.Duration("cost", time.Since(createRaftClusterStart)))
return
}
createRaftClusterDuration := time.Since(createRaftClusterStart)
log.Info("create raft cluster completed", zap.Duration("cost", createRaftClusterDuration))
defer s.stopRaftCluster()
failpoint.Inject("rebaseErr", func() {
failpoint.Return()
})
rebaseStart := time.Now()
if err := s.idAllocator.Rebase(); err != nil {
log.Error("failed to sync id from etcd", errs.ZapError(err))
log.Warn("failed to sync id from etcd", errs.ZapError(err), zap.Duration("cost", time.Since(rebaseStart)))
return
}
rebaseDuration := time.Since(rebaseStart)
log.Info("sync id from etcd completed", zap.Duration("cost", rebaseDuration))
// PromoteSelf to accept the remaining service, such as GetStore, GetRegion.
enableLeaderStart := time.Now()
s.member.PromoteSelf()
enableLeaderDuration := time.Since(enableLeaderStart)
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
totalDuration := time.Since(leaderReadyStart)
defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
Expand All @@ -1812,8 +1839,10 @@ func (s *Server) campaignLeader() {
})

CheckPDVersionWithClusterVersion(s.persistOptions)
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))

log.Info("PD leader is ready to serve",
zap.String("leader-name", s.Name()),
zap.Duration("total-cost", totalDuration),
zap.Duration("cost", enableLeaderDuration))
leaderTicker := time.NewTicker(mcs.LeaderTickInterval)
defer leaderTicker.Stop()

Expand Down
Loading