Skip to content

Commit b21a183

Browse files
authored
tso, server: add debug logs for TSO sync, closure, and forwarding paths (#10439)
ref #10329 Signed-off-by: Yuhao Zhang <yhzhang00@outlook.com>
1 parent dca466b commit b21a183

File tree

6 files changed

+143
-6
lines changed

6 files changed

+143
-6
lines changed

metrics/grafana/pd.json

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10967,7 +10967,7 @@
1096710967
"grid": {},
1096810968
"gridPos": {
1096910969
"h": 9,
10970-
"w": 12,
10970+
"w": 8,
1097110971
"x": 0,
1097210972
"y": 123
1097310973
},
@@ -11082,8 +11082,8 @@
1108211082
"grid": {},
1108311083
"gridPos": {
1108411084
"h": 9,
11085-
"w": 12,
11086-
"x": 12,
11085+
"w": 8,
11086+
"x": 8,
1108711087
"y": 123
1108811088
},
1108911089
"id": 1204,
@@ -11210,6 +11210,104 @@
1121011210
"alignLevel": null
1121111211
}
1121211212
},
11213+
{
11214+
"aliasColors": {},
11215+
"bars": false,
11216+
"dashLength": 10,
11217+
"dashes": false,
11218+
"datasource": "${DS_TEST-CLUSTER}",
11219+
"description": "The rate of TSO streams forwarded to the independent TSO service.",
11220+
"editable": true,
11221+
"error": false,
11222+
"fill": 1,
11223+
"grid": {},
11224+
"gridPos": {
11225+
"h": 9,
11226+
"w": 8,
11227+
"x": 16,
11228+
"y": 123
11229+
},
11230+
"id": 1631,
11231+
"legend": {
11232+
"alignAsTable": true,
11233+
"avg": false,
11234+
"current": true,
11235+
"hideEmpty": true,
11236+
"hideZero": true,
11237+
"max": true,
11238+
"min": false,
11239+
"rightSide": true,
11240+
"show": true,
11241+
"sideWidth": 300,
11242+
"total": false,
11243+
"values": true
11244+
},
11245+
"lines": true,
11246+
"linewidth": 1,
11247+
"links": [],
11248+
"nullPointMode": "null as zero",
11249+
"paceLength": 10,
11250+
"percentage": false,
11251+
"pointradius": 5,
11252+
"points": false,
11253+
"renderer": "flot",
11254+
"seriesOverrides": [],
11255+
"spaceLength": 10,
11256+
"stack": false,
11257+
"steppedLine": false,
11258+
"targets": [
11259+
{
11260+
"expr": "sum(rate(pd_server_tso_forward_stream_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)",
11261+
"format": "time_series",
11262+
"interval": "",
11263+
"intervalFactor": 2,
11264+
"legendFormat": "{{instance}}",
11265+
"refId": "A",
11266+
"step": 2
11267+
}
11268+
],
11269+
"thresholds": [],
11270+
"timeFrom": null,
11271+
"timeRegions": [],
11272+
"timeShift": null,
11273+
"title": "Forward TSO streams count",
11274+
"tooltip": {
11275+
"msResolution": false,
11276+
"shared": true,
11277+
"sort": 0,
11278+
"value_type": "individual"
11279+
},
11280+
"type": "graph",
11281+
"xaxis": {
11282+
"buckets": null,
11283+
"mode": "time",
11284+
"name": null,
11285+
"show": true,
11286+
"values": []
11287+
},
11288+
"yaxes": [
11289+
{
11290+
"format": "ops",
11291+
"label": null,
11292+
"logBase": 1,
11293+
"max": null,
11294+
"min": null,
11295+
"show": true
11296+
},
11297+
{
11298+
"format": "short",
11299+
"label": null,
11300+
"logBase": 1,
11301+
"max": null,
11302+
"min": null,
11303+
"show": true
11304+
}
11305+
],
11306+
"yaxis": {
11307+
"align": false,
11308+
"alignLevel": null
11309+
}
11310+
},
1121311311
{
1121411312
"aliasColors": {},
1121511313
"bars": false,

pkg/tso/allocator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ func (a *Allocator) allocatorUpdater() {
131131
continue
132132
}
133133
if err := a.UpdateTSO(); err != nil {
134-
log.Warn("failed to update allocator's timestamp", append(a.logFields, errs.ZapError(err))...)
134+
log.Warn("failed to update allocator's timestamp, resetting the TSO allocator with leadership resignation",
135+
append(a.logFields, errs.ZapError(err))...)
135136
a.Reset(true)
136137
// To wait for the allocator to be re-initialized next time.
137138
continue

pkg/tso/tso.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,13 @@ func (t *timestampOracle) syncTimestamp() error {
223223
t.metrics.syncSaveDuration.Observe(time.Since(start).Seconds())
224224

225225
t.metrics.syncOKEvent.Inc()
226+
// "last" is the etcd value, "last-saved" is the in-memory window, "save" is the
227+
// newly persisted window, and "next" is the physical time loaded into memory
226228
log.Info("sync and save timestamp",
227229
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
228230
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
229-
zap.Time("save", save), zap.Time("next", next))
231+
zap.Time("save", save), zap.Time("next", next),
232+
zap.String("member-name", t.member.Name()))
230233
// save into memory
231234
t.setTSOPhysical(next)
232235
return nil
@@ -292,6 +295,11 @@ func (t *timestampOracle) resetUserTimestamp(tso uint64, ignoreSmaller, skipUppe
292295
}
293296
t.lastSavedTime.Store(save)
294297
t.metrics.resetSaveDuration.Observe(time.Since(start).Seconds())
298+
log.Info("persisted tso window to etcd (user-reset)",
299+
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
300+
zap.Time("save", save), zap.Time("next", nextPhysical),
301+
zap.String("member-name", t.member.Name()),
302+
)
295303
}
296304
// save into memory only if nextPhysical or nextLogical is greater.
297305
t.tsoMux.physical = nextPhysical
@@ -399,6 +407,11 @@ func (t *timestampOracle) updateTimestamp(purpose updatePurpose) (bool, error) {
399407
}
400408
t.lastSavedTime.Store(save)
401409
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
410+
log.Debug("persisted tso window to etcd (update)",
411+
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
412+
zap.Time("save", save), zap.Time("next", next),
413+
zap.String("member-name", t.member.Name()),
414+
)
402415
}
403416
// If it's an IntervalUpdate, we don't need to check logical overflow, just update physical time directly.
404417
// Otherwise, the caller met logical overflow, so it will allocate physical time to alloc more timestamp in concurrent.

server/cluster/cluster.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ const (
120120
syncRegionTaskRunner = "sync-region-async"
121121
)
122122

123+
const (
124+
tsoDynamicSwitchingStateUnknown int32 = iota
125+
tsoDynamicSwitchingStateEnabled
126+
tsoDynamicSwitchingStateDisabled
127+
)
128+
123129
// Server is the interface for cluster.
124130
type Server interface {
125131
GetAllocator() id.Allocator
@@ -161,6 +167,7 @@ type RaftCluster struct {
161167

162168
running bool
163169
isKeyspaceGroupEnabled bool
170+
tsoDynamicSwitchingState atomic.Int32
164171
meta *metapb.Cluster
165172
storage storage.Storage
166173
minResolvedTS atomic.Value // Store as uint64
@@ -457,6 +464,10 @@ func (c *RaftCluster) checkSchedulingService() {
457464
func (c *RaftCluster) checkTSOService() {
458465
if c.isKeyspaceGroupEnabled {
459466
if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
467+
prev := c.tsoDynamicSwitchingState.Swap(tsoDynamicSwitchingStateEnabled)
468+
if prev == tsoDynamicSwitchingStateDisabled {
469+
log.Info("TSO dynamic switching is enabled, resuming TSO service checks")
470+
}
460471
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
461472
if err != nil || len(servers) == 0 {
462473
if err := c.startTSOJobsIfNeeded(); err != nil {
@@ -474,6 +485,11 @@ func (c *RaftCluster) checkTSOService() {
474485
c.SetServiceIndependent(constant.TSOServiceName)
475486
}
476487
}
488+
} else {
489+
prev := c.tsoDynamicSwitchingState.Swap(tsoDynamicSwitchingStateDisabled)
490+
if prev != tsoDynamicSwitchingStateDisabled {
491+
log.Info("TSO dynamic switching is disabled by config, skipping TSO service checks")
492+
}
477493
}
478494
return
479495
}
@@ -541,7 +557,7 @@ func (c *RaftCluster) stopTSOJobsIfNeeded() {
541557
if !c.tsoAllocator.IsInitialize() {
542558
return
543559
}
544-
log.Info("closing the TSO allocator")
560+
log.Info("closing the embedded TSO allocator")
545561
c.tsoAllocator.Reset(false)
546562
failpoint.Inject("updateAfterResetTSO", func() {
547563
if err := c.tsoAllocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {

server/grpc_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
485485
defer done()
486486
}
487487
if s.IsServiceIndependent(constant.TSOServiceName) {
488+
tsoForwardStreamCounter.Inc()
488489
return s.forwardToTSOService(stream)
489490
}
490491

server/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ var (
9595
Name: "tso_proxy_forward_timeout_total",
9696
Help: "Counter of timeouts when tso proxy forwarding tso requests to tso service.",
9797
})
98+
tsoForwardStreamCounter = prometheus.NewCounter(
99+
prometheus.CounterOpts{
100+
Namespace: "pd",
101+
Subsystem: "server",
102+
Name: "tso_forward_stream_total",
103+
Help: "Counter of TSO streams forwarded to the independent TSO service.",
104+
})
98105

99106
tsoHandleDuration = prometheus.NewHistogram(
100107
prometheus.HistogramOpts{
@@ -221,6 +228,7 @@ func init() {
221228
prometheus.MustRegister(tsoProxyHandleDuration)
222229
prometheus.MustRegister(tsoProxyBatchSize)
223230
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
231+
prometheus.MustRegister(tsoForwardStreamCounter)
224232
prometheus.MustRegister(tsoHandleDuration)
225233
prometheus.MustRegister(tsoBatchSize)
226234
prometheus.MustRegister(queryRegionDuration)

0 commit comments

Comments
 (0)