Skip to content
Merged
104 changes: 101 additions & 3 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -10855,7 +10855,7 @@
"grid": {},
"gridPos": {
"h": 9,
"w": 12,
"w": 8,
"x": 0,
"y": 123
},
Expand Down Expand Up @@ -10970,8 +10970,8 @@
"grid": {},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"w": 8,
"x": 8,
"y": 123
},
"id": 1204,
Expand Down Expand Up @@ -11098,6 +11098,104 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The rate of TSO streams forwarded to the independent TSO service.",
"editable": true,
"error": false,
"fill": 1,
"grid": {},
"gridPos": {
"h": 9,
"w": 8,
"x": 16,
"y": 123
},
"id": 1631,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sideWidth": 300,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"paceLength": 10,
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(pd_server_tso_forward_stream_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Forward TSO streams count",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "ops",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (a *Allocator) allocatorUpdater() {
continue
}
if err := a.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp", append(a.logFields, errs.ZapError(err))...)
log.Warn("failed to update allocator's timestamp, resetting the TSO allocator with leadership resignation",
append(a.logFields, errs.ZapError(err))...)
a.Reset(true)
// To wait for the allocator to be re-initialized next time.
continue
Expand Down
15 changes: 14 additions & 1 deletion pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,13 @@ func (t *timestampOracle) syncTimestamp() error {
t.metrics.syncSaveDuration.Observe(time.Since(start).Seconds())

t.metrics.syncOKEvent.Inc()
// "last" is the etcd value, "last-saved" is the in-memory window, "save" is the
// newly persisted window, and "next" is the physical time loaded into memory
log.Info("sync and save timestamp",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
zap.Time("save", save), zap.Time("next", next))
zap.Time("save", save), zap.Time("next", next),
zap.String("member-name", t.member.Name()))
// save into memory
t.setTSOPhysical(next)
return nil
Expand Down Expand Up @@ -292,6 +295,11 @@ func (t *timestampOracle) resetUserTimestamp(tso uint64, ignoreSmaller, skipUppe
}
t.lastSavedTime.Store(save)
t.metrics.resetSaveDuration.Observe(time.Since(start).Seconds())
log.Info("persisted tso window to etcd (user-reset)",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Time("save", save), zap.Time("next", nextPhysical),
zap.String("member-name", t.member.Name()),
)
}
// save into memory only if nextPhysical or nextLogical is greater.
t.tsoMux.physical = nextPhysical
Expand Down Expand Up @@ -399,6 +407,11 @@ func (t *timestampOracle) updateTimestamp(purpose updatePurpose) (bool, error) {
}
t.lastSavedTime.Store(save)
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
log.Debug("persisted tso window to etcd (update)",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Time("save", save), zap.Time("next", next),
zap.String("member-name", t.member.Name()),
)
}
// If it's an IntervalUpdate, we don't need to check logical overflow, just update physical time directly.
// Otherwise, the caller met logical overflow, so it will allocate physical time to alloc more timestamp in concurrent.
Expand Down
18 changes: 17 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ const (
syncRegionTaskRunner = "sync-region-async"
)

const (
tsoDynamicSwitchingStateUnknown int32 = iota
tsoDynamicSwitchingStateEnabled
tsoDynamicSwitchingStateDisabled
)

// Server is the interface for cluster.
type Server interface {
GetAllocator() id.Allocator
Expand Down Expand Up @@ -166,6 +172,7 @@ type RaftCluster struct {

running bool
isKeyspaceGroupEnabled bool
tsoDynamicSwitchingState atomic.Int32
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
Expand Down Expand Up @@ -462,6 +469,10 @@ func (c *RaftCluster) checkSchedulingService() {
func (c *RaftCluster) checkTSOService() {
if c.isKeyspaceGroupEnabled {
if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
prev := c.tsoDynamicSwitchingState.Swap(tsoDynamicSwitchingStateEnabled)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use CAS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkTSOService() is effectively serialized (only called from Start() and from the single runServiceCheckJob() goroutine), and I don't feel this path is intended to become concurrent in the near future. Maybe keep Swap here is the simplest yet working way.

if prev == tsoDynamicSwitchingStateDisabled {
log.Info("TSO dynamic switching is enabled, resuming TSO service checks")
}
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
Expand All @@ -479,6 +490,11 @@ func (c *RaftCluster) checkTSOService() {
c.SetServiceIndependent(constant.TSOServiceName)
}
}
} else {
prev := c.tsoDynamicSwitchingState.Swap(tsoDynamicSwitchingStateDisabled)
if prev != tsoDynamicSwitchingStateDisabled {
log.Info("TSO dynamic switching is disabled by config, skipping TSO service checks")
}
}
return
}
Expand Down Expand Up @@ -546,7 +562,7 @@ func (c *RaftCluster) stopTSOJobsIfNeeded() {
if !c.tsoAllocator.IsInitialize() {
return
}
log.Info("closing the TSO allocator")
log.Info("closing the embedded TSO allocator")
c.tsoAllocator.Reset(false)
failpoint.Inject("updateAfterResetTSO", func() {
if err := c.tsoAllocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
Expand Down
1 change: 1 addition & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
defer done()
}
if s.IsServiceIndependent(constant.TSOServiceName) {
tsoForwardStreamCounter.Inc()
return s.forwardToTSOService(stream)
}

Expand Down
8 changes: 8 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ var (
Name: "tso_proxy_forward_timeout_total",
Help: "Counter of timeouts when tso proxy forwarding tso requests to tso service.",
})
tsoForwardStreamCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "server",
Name: "tso_forward_stream_total",
Help: "Counter of TSO streams forwarded to the independent TSO service.",
})

tsoHandleDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -221,6 +228,7 @@ func init() {
prometheus.MustRegister(tsoProxyHandleDuration)
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoForwardStreamCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(tsoBatchSize)
prometheus.MustRegister(queryRegionDuration)
Expand Down
Loading