Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
eadad73
client: resource manager service discovery (#391)
disksing Jul 28, 2025
52f62c0
Merge branch 'master' of https://github.com/tikv/pd into rm-client-di…
okJiang Oct 9, 2025
543be8d
Merge branch 'master' of github.com:tikv/pd into rm-client-discovery
okJiang Nov 25, 2025
d3c9b27
support TestServerPrimaryChange
okJiang Nov 25, 2025
012ffc9
enable register and member test
okJiang Nov 27, 2025
584a0d4
fallback to call pd server if can't find resource manager
okJiang Dec 18, 2025
914d163
save
okJiang Dec 18, 2025
e0336c4
revert some test
okJiang Dec 19, 2025
adbac15
Merge branch 'master' of github.com:tikv/pd into rm-client-discovery
okJiang Dec 19, 2025
d9ebeea
add test and remove option
okJiang Dec 24, 2025
9411339
client: update resource manager serivce url after error (#436)
disksing Nov 21, 2025
5d2fde2
add comment
okJiang Dec 24, 2025
480a01d
update log level
okJiang Dec 24, 2025
17f134e
fix lint
okJiang Dec 24, 2025
52ef981
deploy tso server
okJiang Dec 25, 2025
0806426
introduce provider
okJiang Dec 25, 2025
d01cc9c
fix lint
okJiang Dec 25, 2025
3842062
fix data race
okJiang Dec 25, 2025
3e2a379
add wait group to ResourceGroupsController for graceful shutdown
okJiang Dec 25, 2025
5ed0fba
revert kvproto
okJiang Dec 26, 2025
4f503dc
remove provider
okJiang Dec 26, 2025
fbd583b
fix lint
okJiang Dec 29, 2025
73280e0
fix ut
okJiang Dec 29, 2025
16e6e2b
fix ut
okJiang Dec 29, 2025
7d73688
fix comment
okJiang Dec 29, 2025
2476a1f
introduce retry pkg
okJiang Dec 29, 2025
036da6c
fix lint
okJiang Dec 29, 2025
c7d8747
fix comment: rename
okJiang Jan 5, 2026
faa6908
fix lint
okJiang Jan 5, 2026
5e3cdf1
add switch test
okJiang Jan 5, 2026
933dc3f
Merge branch 'rm-client-discovery' of github.com:okJiang/pd into rm-c…
okJiang Jan 5, 2026
ccf8aca
fix lint
okJiang Jan 5, 2026
94fa471
fix lint
okJiang Jan 5, 2026
1cea5b6
fix comment and enhance unit test
okJiang Jan 5, 2026
9687b2b
add more check
okJiang Jan 5, 2026
6b931f4
fix comment
okJiang Jan 5, 2026
7a05484
fix typo
okJiang Jan 5, 2026
f2c0e74
add error handling for resource manager service URL updates
okJiang Jan 5, 2026
d32d809
fix comment: rename
okJiang Jan 5, 2026
1ed5031
fix typo
okJiang Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,25 @@ var _ Client = (*client)(nil)
// serviceModeKeeper is for service mode switching.
type serviceModeKeeper struct {
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient *tso.Cli
tsoSvcDiscovery sd.ServiceDiscovery
routerClient *router.Cli
serviceMode pdpb.ServiceMode
tsoClient *tso.Cli
tsoSvcDiscovery sd.ServiceDiscovery
routerClient *router.Cli
resourceManagerDiscovery *sd.ResourceManagerDiscovery
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
if k.tsoSvcDiscovery != nil {
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
k.tsoClient.Close()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
k.tsoSvcDiscovery = nil
}
if k.resourceManagerDiscovery != nil {
k.resourceManagerDiscovery.Close()
k.resourceManagerDiscovery = nil
}
k.tsoClient.Close()
}

type client struct {
Expand Down Expand Up @@ -451,7 +453,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
if !ok {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
}
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE && enable {
if c.inner.getTSOProvider() != tsoProviderPD && enable {
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
}
c.inner.option.SetEnableTSOFollowerProxy(enable)
Expand Down Expand Up @@ -600,18 +602,12 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi
// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
case pdpb.ServiceMode_PD_SVC_MODE:
if c.inner.getTSOProvider() == tsoProviderPD {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle ServiceMode_UNKNOWN_SVC_MODE?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error like before. I reverted this line 7d73688

// If the service mode is switched to API during GetTS() call, which happens during migration,
// returning the default timeline should be fine.
return c.GetTS(ctx)
case pdpb.ServiceMode_API_SVC_MODE:
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
// Call GetMinTS API to get the minimal TS from the API leader.
Expand Down
90 changes: 66 additions & 24 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,35 +142,33 @@ func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if c.option.UseTSOServerProxy {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}
if newMode == c.serviceMode {
return
}
log.Info("[pd] changing TSO provider",
zap.String("old", convertToString(c.serviceMode)),
zap.String("new", convertToString(newMode)))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] TSO provider changed",
zap.String("old", convertToString(oldMode)),
zap.String("new", convertToString(newMode)))
}

func convertToString(mode pdpb.ServiceMode) string {
switch mode {
switch newMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
log.Warn("[pd] intend to switch to unknown service mode, use PD_SVC_MODE")
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
return "pd"
if c.tsoSvcDiscovery != nil || c.tsoClient == nil {
c.resetTSOClientLocked(newMode)
}
if c.resourceManagerDiscovery != nil {
c.resetResourceManagerDiscoveryLocked(newMode)
}
case pdpb.ServiceMode_API_SVC_MODE:
return "tso server"
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return "unknown"
default:
return "invalid"
// If we are using TSO server proxy, we always use PD_SVC_MODE.
if c.tsoSvcDiscovery == nil && !c.option.UseTSOServerProxy {
c.resetTSOClientLocked(newMode)
}
if c.resourceManagerDiscovery == nil && !c.option.UseResourceManagerProxy {
c.resetResourceManagerDiscoveryLocked(newMode)
}
}

c.serviceMode = newMode
log.Info("[pd] service mode changed", zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
Expand All @@ -184,6 +182,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = tso.NewClient(c.ctx, c.option,
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
log.Info("[pd] tso provider changed to pd")
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
c.ctx, c, c.serviceDiscovery,
Expand All @@ -198,6 +197,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
zap.Error(err))
return
}
log.Info("[pd] tso provider changed to tso server")
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
log.Warn("[pd] intend to switch to unknown service mode, just return")
return
Expand All @@ -219,6 +219,38 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
}
}

func (c *innerClient) resetResourceManagerDiscoveryLocked(newMode pdpb.ServiceMode) {
var newResourceManagerDiscovery *sd.ResourceManagerDiscovery
if newMode == pdpb.ServiceMode_API_SVC_MODE && !c.option.UseResourceManagerProxy {
newResourceManagerDiscovery = sd.NewResourceManagerDiscovery(
c.ctx, c.serviceDiscovery.GetClusterID(), c, c.tlsCfg, c.option, c.scheduleUpdateTokenConnection)
if err := newResourceManagerDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize resource manager discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls),
zap.String("current-mode", c.serviceMode.String()),
zap.Error(err))
newResourceManagerDiscovery.Close()
return
}
}
if c.resourceManagerDiscovery != nil {
c.resourceManagerDiscovery.Close()
}
c.resourceManagerDiscovery = newResourceManagerDiscovery
if newMode == pdpb.ServiceMode_PD_SVC_MODE {
log.Info("[pd] resource manager provider changed to pd")
} else {
log.Info("[pd] resource manager provider changed to resource manager server")
}
_ = c.scheduleUpdateTokenConnection("")
}

func (c *innerClient) getResourceManagerDiscovery() *sd.ResourceManagerDiscovery {
c.RLock()
defer c.RUnlock()
return c.resourceManagerDiscovery
}

func (c *innerClient) scheduleUpdateTokenConnection(string) error {
select {
case c.updateTokenConnectionCh <- struct{}{}:
Expand All @@ -227,10 +259,20 @@ func (c *innerClient) scheduleUpdateTokenConnection(string) error {
return nil
}

func (c *innerClient) getServiceMode() pdpb.ServiceMode {
type tsoProvider int

const (
tsoProviderPD tsoProvider = iota
tsoProviderTSOServer
)

func (c *innerClient) getTSOProvider() tsoProvider {
c.RLock()
defer c.RUnlock()
return c.serviceMode
if c.tsoSvcDiscovery != nil {
return tsoProviderTSOServer
}
return tsoProviderPD
}

func (c *innerClient) getTSOClient() *tso.Cli {
Expand Down
28 changes: 20 additions & 8 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ const (
// It provides the ability to change some PD client's options online from the outside.
type Option struct {
// Static options.
GRPCDialOptions []grpc.DialOption
Timeout time.Duration
MaxRetryTimes int
EnableForwarding bool
UseTSOServerProxy bool
MetricsLabels prometheus.Labels
InitMetrics bool
Backoffer *retry.Backoffer
GRPCDialOptions []grpc.DialOption
Timeout time.Duration
MaxRetryTimes int
EnableForwarding bool
UseTSOServerProxy bool
UseResourceManagerProxy bool
MetricsLabels prometheus.Labels
InitMetrics bool
Backoffer *retry.Backoffer

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand All @@ -87,6 +88,7 @@ func NewOption() *Option {
EnableFollowerHandleCh: make(chan struct{}, 1),
EnableRouterClientCh: make(chan struct{}, 1),
InitMetrics: true,
UseResourceManagerProxy: true,
}

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
Expand Down Expand Up @@ -200,6 +202,16 @@ func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
}
}

// WithResourceManagerProxyOption configures the client to use resource manager proxy,
// i.e., the client will send resource manager requests to the API leader (the resource manager
// proxy) which will forward the requests to the resource manager servers.
// Default is true, which means the client will use resource manager proxy.
func WithResourceManagerProxyOption(useResourceManagerProxy bool) ClientOption {
return func(op *Option) {
op.UseResourceManagerProxy = useResourceManagerProxy
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(op *Option) {
Expand Down
5 changes: 5 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func WithRUStats(op *GetResourceGroupOp) {

// resourceManagerClient gets the ResourceManager client of current PD leader.
func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) {
if ds := c.getResourceManagerDiscovery(); ds != nil {
if cc := ds.GetConn(); cc != nil {
return rmpb.NewResourceManagerClient(cc), nil
}
}
cc, err := c.getOrCreateGRPCConn()
if err != nil {
return nil, err
Expand Down
Loading
Loading