Skip to content
6 changes: 6 additions & 0 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type RegionLabeler struct {

// NewRegionLabeler creates a Labeler instance.
func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInterval time.Duration) (*RegionLabeler, error) {
start := time.Now()
defer func() {
newRegionLabelerDuration.Observe(time.Since(start).Seconds())
}()

l := &RegionLabeler{
storage: storage,
labelRules: make(map[string]*LabelRule),
Expand All @@ -54,6 +59,7 @@ func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInter
if err := l.loadRules(); err != nil {
return nil, err
}
log.Info("new region labeler created", zap.Int("label-rules-count", len(l.labelRules)))
go l.doGC(gcInterval)
return l, nil
}
Expand Down
28 changes: 20 additions & 8 deletions pkg/schedule/labeler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,27 @@ package labeler

import "github.com/prometheus/client_golang/prometheus"

// LabelerEventCounter is a counter of the scheduler labeler system.
var LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})
var (
// LabelerEventCounter is a counter of the scheduler labeler system.
LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})

newRegionLabelerDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "new_region_labeler_duration_seconds",
Help: "Bucketed histogram of processing time (s) of creating new region labeler.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 4s
})
)

func init() {
prometheus.MustRegister(LabelerEventCounter)
prometheus.MustRegister(newRegionLabelerDuration)
}
58 changes: 57 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ func (c *RaftCluster) InitCluster(

// Start starts a cluster.
func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
start := time.Now()
defer func() {
raftClusterStartDuration.Observe(time.Since(start).Seconds())
}()

c.Lock()
defer c.Unlock()

Expand All @@ -335,15 +340,24 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
return nil
}
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
log.Info("[leader-ready] start to initialize cluster")
initClusterStart := time.Now()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
log.Error("[leader-ready] failed to initialize cluster", errs.ZapError(err), zap.Duration("cost", time.Since(initClusterStart)))
return err
}
initClusterDuration := time.Since(initClusterStart)
log.Info("[leader-ready] initialize cluster completed", zap.Duration("cost", initClusterDuration))
// We should not manage tso service when bootstrap try to start raft cluster.
// It only is controlled by leader election.
// Ref: https://github.com/tikv/pd/issues/8836
if !bootstrap {
log.Info("[leader-ready] start to check TSO service")
checkTSOStart := time.Now()
c.checkTSOService()
checkTSODuration := time.Since(checkTSOStart)
log.Info("[leader-ready] check TSO service completed", zap.Duration("cost", checkTSODuration))
}
defer func() {
if !bootstrap && err != nil {
Expand All @@ -358,12 +372,16 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}
failpoint.Return(err)
})
log.Info("[leader-ready] start to load cluster info")
loadClusterInfoStart := time.Now()
cluster, err := c.LoadClusterInfo()
if err != nil {
log.Error("[leader-ready] failed to load cluster info", errs.ZapError(err), zap.Duration("cost", time.Since(loadClusterInfoStart)))
return err
}
loadClusterInfoDuration := time.Since(loadClusterInfoStart)
if cluster == nil {
log.Warn("cluster is not bootstrapped")
log.Warn("[leader-ready] cluster is not bootstrapped", zap.Duration("cost", loadClusterInfoDuration))
return nil
}
if c.opt.IsPlacementRulesEnabled() {
Expand All @@ -372,11 +390,17 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
return err
}
}
log.Info("[leader-ready] load cluster info completed", zap.Duration("cost", loadClusterInfoDuration))

log.Info("[leader-ready] creating region labeler")
labelerStart := time.Now()
c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
labelerDuration := time.Since(labelerStart)
if err != nil {
log.Error("[leader-ready] region labeler creation failed", zap.Error(err), zap.Duration("cost", labelerDuration))
return err
}
log.Info("[leader-ready] region labeler created", zap.Duration("cost", labelerDuration))

// create affinity manager with region labeler for key range validation and rebuild
c.affinityManager, err = affinity.NewManager(c.ctx, c.storage, c, c.GetOpts(), c.regionLabeler)
Expand All @@ -385,27 +409,53 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}

if !c.IsServiceIndependent(constant.SchedulingServiceName) {
log.Info("[leader-ready] start to observe slow store status")
observeSlowStoreStart := time.Now()
for _, store := range c.GetStores() {
storeID := store.GetID()
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
}
observeSlowStoreDuration := time.Since(observeSlowStoreStart)
log.Info("[leader-ready] observe slow store status completed", zap.Duration("cost", observeSlowStoreDuration))
}
log.Info("[leader-ready] start to create replication mode manager")
replicationModeStart := time.Now()
c.replicationMode, err = replication.NewReplicationModeManager(s.GetConfig().ReplicationMode, c.storage, cluster, s)
if err != nil {
log.Error("[leader-ready] failed to create replication mode manager", errs.ZapError(err), zap.Duration("cost", time.Since(replicationModeStart)))
return err
}
replicationModeDuration := time.Since(replicationModeStart)
log.Info("[leader-ready] create replication mode manager completed", zap.Duration("cost", replicationModeDuration))
log.Info("[leader-ready] start to load external timestamp")
loadExternalTSStart := time.Now()
c.loadExternalTS()
log.Info("[leader-ready] load external timestamp completed", zap.Duration("cost", time.Since(loadExternalTSStart)))
log.Info("[leader-ready] start to load min resolved ts")
loadMinResolvedTSStart := time.Now()
c.loadMinResolvedTS()
log.Info("[leader-ready] load min resolved ts completed", zap.Duration("cost", time.Since(loadMinResolvedTSStart)))

if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
log.Info("[leader-ready] start to bootstrap keyspace group manager")
bootstrapKeyspaceStart := time.Now()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
log.Error("[leader-ready] failed to bootstrap keyspace group manager", errs.ZapError(err), zap.Duration("cost", time.Since(bootstrapKeyspaceStart)))
return err
}
bootstrapKeyspaceDuration := time.Since(bootstrapKeyspaceStart)
log.Info("[leader-ready] bootstrap keyspace group manager completed", zap.Duration("cost", bootstrapKeyspaceDuration))
}
log.Info("[leader-ready] start to check scheduling service")
checkSchedulingStart := time.Now()
c.checkSchedulingService()
checkSchedulingDuration := time.Since(checkSchedulingStart)
log.Info("[leader-ready] check scheduling service completed", zap.Duration("cost", checkSchedulingDuration))
log.Info("[leader-ready] start to start background jobs")
backgroundJobsStart := time.Now()
c.wg.Add(11)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -418,12 +468,18 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
go c.startGCTuner()
go c.startProgressGC()
go c.runStorageSizeCollector(s.GetMeteringWriter(), c.regionLabeler, s.GetKeyspaceManager())
backgroundJobsDuration := time.Since(backgroundJobsStart)
log.Info("[leader-ready] start background jobs completed", zap.Duration("cost", backgroundJobsDuration))

log.Info("[leader-ready] start to start runners")
runnersStart := time.Now()
c.running = true
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
c.syncRegionRunner.Start(c.ctx)
runnersDuration := time.Since(runnersStart)
log.Info("[leader-ready] start runners completed", zap.Duration("cost", runnersDuration))
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ var (
Name: "store_trigger_network_slow_evict",
Help: "The count of store trigger network slow evict",
}, []string{"store"})

raftClusterStartDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "cluster",
Name: "raftcluster_start_duration_seconds",
Help: "Bucketed histogram of processing time (s) of raft cluster start.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 4s
})
)

func init() {
Expand All @@ -81,4 +90,5 @@ func init() {
prometheus.MustRegister(storeSyncConfigEvent)
prometheus.MustRegister(updateStoreStatsGauge)
prometheus.MustRegister(storeTriggerNetworkSlowEvict)
prometheus.MustRegister(raftClusterStartDuration)
}
50 changes: 42 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,9 @@ func (s *Server) campaignLeader() {
}
return
}
// Start timing from when leader is successfully elected
leaderReadyStart := time.Now()
log.Info("[leader-ready] leader election succeeded, start leader ready process")

// Start keepalive the leadership and enable TSO service.
// TSO service is strictly enabled/disabled by PD leader lease for 2 reasons:
Expand All @@ -1761,48 +1764,74 @@ func (s *Server) campaignLeader() {
})

// maintain the PD leadership, after this, TSO can be service.
log.Info("[leader-ready] start to keep leader lease")
keepLeaderStart := time.Now()
s.member.GetLeadership().Keep(ctx)
keepLeaderDuration := time.Since(keepLeaderStart)
log.Info("[leader-ready] keep leader lease completed", zap.Duration("cost", keepLeaderDuration))
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))

reloadConfigStart := time.Now()
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
log.Error("[leader-ready] failed to reload configuration", errs.ZapError(err), zap.Duration("cost", time.Since(reloadConfigStart)))
return
}
reloadConfigDuration := time.Since(reloadConfigStart)
log.Info("[leader-ready] reload config from KV completed", zap.Duration("cost", reloadConfigDuration))

loadTTLStart := time.Now()
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
log.Error("[leader-ready] failed to load persistOptions from etcd", errs.ZapError(err), zap.Duration("cost", time.Since(loadTTLStart)))
return
}
loadTTLDuration := time.Since(loadTTLStart)
log.Info("[leader-ready] load persist options from etcd completed", zap.Duration("cost", loadTTLDuration))

encryptionStart := time.Now()
if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil {
log.Error("failed to initialize encryption", errs.ZapError(err))
log.Error("[leader-ready] failed to initialize encryption", errs.ZapError(err), zap.Duration("cost", time.Since(encryptionStart)))
return
}
encryptionDuration := time.Since(encryptionStart)
log.Info("[leader-ready] initialize encryption completed", zap.Duration("cost", encryptionDuration))

log.Info("triggering the leader callback functions")
callbacksStart := time.Now()
log.Info("[leader-ready] triggering the leader callback functions")
for _, cb := range s.leaderCallbacks {
if err := cb(ctx); err != nil {
log.Error("failed to execute leader callback function", errs.ZapError(err))
log.Error("[leader-ready] failed to execute leader callback function", errs.ZapError(err), zap.Duration("cost", time.Since(callbacksStart)))
return
}
}
callbacksDuration := time.Since(callbacksStart)
log.Info("[leader-ready] trigger leader callback functions completed", zap.Duration("cost", callbacksDuration))

// Try to create raft cluster.
createRaftClusterStart := time.Now()
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", errs.ZapError(err))
log.Error("[leader-ready] failed to create raft cluster", errs.ZapError(err), zap.Duration("cost", time.Since(createRaftClusterStart)))
return
}
createRaftClusterDuration := time.Since(createRaftClusterStart)
log.Info("[leader-ready] create raft cluster completed", zap.Duration("cost", createRaftClusterDuration))
defer s.stopRaftCluster()
failpoint.Inject("rebaseErr", func() {
failpoint.Return()
})
rebaseStart := time.Now()
if err := s.idAllocator.Rebase(); err != nil {
log.Error("failed to sync id from etcd", errs.ZapError(err))
log.Error("[leader-ready] failed to sync id from etcd", errs.ZapError(err), zap.Duration("cost", time.Since(rebaseStart)))
return
}
rebaseDuration := time.Since(rebaseStart)
log.Info("[leader-ready] sync id from etcd completed", zap.Duration("cost", rebaseDuration))
// PromoteSelf to accept the remaining service, such as GetStore, GetRegion.
log.Info("[leader-ready] start to promote leader")
enableLeaderStart := time.Now()
s.member.PromoteSelf()
enableLeaderDuration := time.Since(enableLeaderStart)
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
log.Info("[leader-ready] promote leader completed", zap.Duration("cost", enableLeaderDuration))
defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
Expand All @@ -1811,8 +1840,13 @@ func (s *Server) campaignLeader() {
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
})

log.Info("[leader-ready] start to check PD version with cluster version")
versionCheckStart := time.Now()
CheckPDVersionWithClusterVersion(s.persistOptions)
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))
versionCheckDuration := time.Since(versionCheckStart)
log.Info("[leader-ready] check PD version with cluster version completed", zap.Duration("cost", versionCheckDuration))
totalDuration := time.Since(leaderReadyStart)
log.Info("[leader-ready] PD leader is ready to serve", zap.String("leader-name", s.Name()), zap.Duration("total-cost", totalDuration))

leaderTicker := time.NewTicker(mcs.LeaderTickInterval)
defer leaderTicker.Stop()
Expand Down
Loading