Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ func (c *Cli) scheduleUpdateTSOConnectionCtxs() {
func (c *Cli) GetTSORequest(ctx context.Context) *Request {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the physical, logical, streamID, and other fields are reset, but the done channel is not reset. A req taken from the pool may have leftover data in its done channel. However, this isn’t an issue introduced by this PR, but it’s worth keeping an eye on.

req := c.tsoReqPool.Get().(*Request)
// Set needed fields in the request before using it.
req.mu.Lock()
Copy link
Copy Markdown
Member

@rleungx rleungx Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it cause performance regression?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait a test result. I will do it.

Copy link
Copy Markdown
Member Author

@okJiang okJiang Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost -2% performance regression. Can we accept it? cc @JmPotato

./bin/pd-tso-bench -client 1 -duration 10m -c 100

Master
Total:
count: 174720411, max: 10.5724ms, min: 0.0911ms, avg: 0.3338ms
<1ms: 174699770, >1ms: 19262, >2ms: 579, >5ms: 700, >10ms: 100, >30ms: 0, >50ms: 0, >100ms: 0, >200ms: 0, >400ms: 0, >800ms: 0, >1s: 0
count: 174720411, <1ms: 99.99%, >1ms: 0.01%, >2ms: 0.00%, >5ms: 0.00%, >10ms: 0.00%, >30ms: 0.00%, >50ms: 0.00%, >100ms: 0.00%, >200ms:
 0.00%, >400ms: 0.00%, >800ms: 0.00%, >1s: 0.00%
P0.5: 0.3348ms, P0.8: 0.3789ms, P0.9: 0.4039ms, P0.99: 0.4830ms

Pr
Total:
count: 171009435, max: 10.3831ms, min: 0.0940ms, avg: 0.3415ms
<1ms: 170987346, >1ms: 20519, >2ms: 871, >5ms: 599, >10ms: 100, >30ms: 0, >50ms: 0, >100ms: 0, >200ms: 0, >400ms: 0, >800ms: 0, >1s: 0
count: 171009435, <1ms: 99.99%, >1ms: 0.01%, >2ms: 0.00%, >5ms: 0.00%, >10ms: 0.00%, >30ms: 0.00%, >50ms: 0.00%, >100ms: 0.00%, >200ms:
 0.00%, >400ms: 0.00%, >800ms: 0.00%, >1s: 0.00%
P0.5: 0.3420ms, P0.8: 0.3875ms, P0.9: 0.4135ms, P0.99: 0.4959ms

Copy link
Copy Markdown
Member Author

@okJiang okJiang Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more test after applying this comment #10130 (comment), the conclusion remains unchanged

Total:
count: 170459945, max: 10.4290ms, min: 0.0907ms, avg: 0.3432ms
<1ms: 170427514, >1ms: 28694, >2ms: 2732, >5ms: 953, >10ms: 52, >30ms: 0, >50ms: 0, >100ms: 0, >200ms: 0, >400ms: 0, >800ms: 0, >1s: 0
count: 170459945, <1ms: 99.98%, >1ms: 0.02%, >2ms: 0.00%, >5ms: 0.00%, >10ms: 0.00%, >30ms: 0.00%, >50ms: 0.00%, >100ms: 0.00%, >200ms:
 0.00%, >400ms: 0.00%, >800ms: 0.00%, >1s: 0.00%
P0.5: 0.3429ms, P0.8: 0.3899ms, P0.9: 0.4173ms, P0.99: 0.5068ms

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req.start = time.Now()
req.pool = c.tsoReqPool
req.requestCtx = ctx
req.clientCtx = c.ctx
req.physical = 0
req.logical = 0
req.streamID = ""
req.mu.Unlock()
return req
}

Expand Down
2 changes: 2 additions & 0 deletions client/clients/tso/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,11 @@ func (td *tsoDispatcher) processRequests(
func tsoRequestFinisher(physical, firstLogical int64, streamID string) batch.FinisherFunc[*Request] {
return func(idx int, tsoReq *Request, err error) {
// Retrieve the request context before the request is done to trace without race.
tsoReq.mu.Lock()
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, firstLogical+int64(idx)
tsoReq.streamID = streamID
tsoReq.mu.Unlock()
tsoReq.TryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
Expand Down
28 changes: 21 additions & 7 deletions client/clients/tso/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (

// Request is a TSO request.
type Request struct {
mu sync.RWMutex
requestCtx context.Context
clientCtx context.Context
done chan error
Expand All @@ -57,11 +58,15 @@ func (req *Request) IsFrom(pool *sync.Pool) bool {
if req == nil {
return false
}
req.mu.RLock()
defer req.mu.RUnlock()
return req.pool == pool
}

// TryDone tries to send the result to the channel, it will not block.
func (req *Request) TryDone(err error) {
req.mu.RLock()
defer req.mu.RUnlock()
select {
case req.done <- err:
default:
Expand All @@ -75,29 +80,38 @@ func (req *Request) Wait() (physical int64, logical int64, err error) {

// waitCtx waits for the TSO result with specified ctx, while not using req.requestCtx.
func (req *Request) waitCtx(ctx context.Context) (physical int64, logical int64, err error) {
req.mu.RLock()
startAt := req.start
clientCtx := req.clientCtx
pool := req.pool
requestCtxForTrace := req.requestCtx
req.mu.RUnlock()

// If tso command duration is observed very high, the reason could be it
// takes too long for Wait() be called.
start := time.Now()
metrics.CmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds())
metrics.CmdDurationTSOAsyncWait.Observe(start.Sub(startAt).Seconds())
select {
case err = <-req.done:
defer req.pool.Put(req)
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
defer pool.Put(req)
defer trace.StartRegion(requestCtxForTrace, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
now := time.Now()
if err != nil {
metrics.CmdFailedDurationTSOWait.Observe(now.Sub(start).Seconds())
metrics.CmdFailedDurationTSO.Observe(now.Sub(req.start).Seconds())
metrics.CmdFailedDurationTSO.Observe(now.Sub(startAt).Seconds())
return 0, 0, err
}
req.mu.RLock()
physical, logical = req.physical, req.logical
req.mu.RUnlock()
metrics.CmdDurationTSOWait.Observe(now.Sub(start).Seconds())
metrics.CmdDurationTSO.Observe(now.Sub(req.start).Seconds())
metrics.CmdDurationTSO.Observe(now.Sub(startAt).Seconds())
return
case <-ctx.Done():
return 0, 0, errors.WithStack(ctx.Err())
case <-req.clientCtx.Done():
return 0, 0, errors.WithStack(req.clientCtx.Err())
case <-clientCtx.Done():
return 0, 0, errors.WithStack(clientCtx.Err())
}
}

Expand Down