Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
eadad73
client: resource manager service discovery (#391)
disksing Jul 28, 2025
52f62c0
Merge branch 'master' of https://github.com/tikv/pd into rm-client-di…
okJiang Oct 9, 2025
543be8d
Merge branch 'master' of github.com:tikv/pd into rm-client-discovery
okJiang Nov 25, 2025
d3c9b27
support TestServerPrimaryChange
okJiang Nov 25, 2025
012ffc9
enable register and member test
okJiang Nov 27, 2025
584a0d4
fallback to call pd server if can't find resource manager
okJiang Dec 18, 2025
914d163
save
okJiang Dec 18, 2025
e0336c4
revert some test
okJiang Dec 19, 2025
adbac15
Merge branch 'master' of github.com:tikv/pd into rm-client-discovery
okJiang Dec 19, 2025
d9ebeea
add test and remove option
okJiang Dec 24, 2025
9411339
client: update resource manager serivce url after error (#436)
disksing Nov 21, 2025
5d2fde2
add comment
okJiang Dec 24, 2025
480a01d
update log level
okJiang Dec 24, 2025
17f134e
fix lint
okJiang Dec 24, 2025
52ef981
deploy tso server
okJiang Dec 25, 2025
0806426
introduce provider
okJiang Dec 25, 2025
d01cc9c
fix lint
okJiang Dec 25, 2025
3842062
fix data race
okJiang Dec 25, 2025
3e2a379
add wait group to ResourceGroupsController for graceful shutdown
okJiang Dec 25, 2025
5ed0fba
revert kvproto
okJiang Dec 26, 2025
4f503dc
remove provider
okJiang Dec 26, 2025
fbd583b
fix lint
okJiang Dec 29, 2025
73280e0
fix ut
okJiang Dec 29, 2025
16e6e2b
fix ut
okJiang Dec 29, 2025
7d73688
fix comment
okJiang Dec 29, 2025
2476a1f
introduce retry pkg
okJiang Dec 29, 2025
036da6c
fix lint
okJiang Dec 29, 2025
c7d8747
fix comment: rename
okJiang Jan 5, 2026
faa6908
fix lint
okJiang Jan 5, 2026
5e3cdf1
add switch test
okJiang Jan 5, 2026
933dc3f
Merge branch 'rm-client-discovery' of github.com:okJiang/pd into rm-c…
okJiang Jan 5, 2026
ccf8aca
fix lint
okJiang Jan 5, 2026
94fa471
fix lint
okJiang Jan 5, 2026
1cea5b6
fix comment and enhance unit test
okJiang Jan 5, 2026
9687b2b
add more check
okJiang Jan 5, 2026
6b931f4
fix comment
okJiang Jan 5, 2026
7a05484
fix typo
okJiang Jan 5, 2026
f2c0e74
add error handling for resource manager service URL updates
okJiang Jan 5, 2026
d32d809
fix comment: rename
okJiang Jan 5, 2026
1ed5031
fix typo
okJiang Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,25 @@ var _ Client = (*client)(nil)
// serviceModeKeeper is for service mode switching.
type serviceModeKeeper struct {
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient *tso.Cli
tsoSvcDiscovery sd.ServiceDiscovery
routerClient *router.Cli
serviceMode pdpb.ServiceMode
tsoClient *tso.Cli
tsoSvcDiscovery sd.ServiceDiscovery
routerClient *router.Cli
resourceManagerDiscovery *sd.ResourceManagerDiscovery
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
if k.tsoSvcDiscovery != nil {
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
k.tsoClient.Close()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
k.tsoSvcDiscovery = nil
}
if k.resourceManagerDiscovery != nil {
k.resourceManagerDiscovery.Close()
k.resourceManagerDiscovery = nil
}
k.tsoClient.Close()
}

type client struct {
Expand Down Expand Up @@ -430,6 +432,19 @@ func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
return c.inner.serviceDiscovery
}

// GetResourceManagerServiceURL returns the discovered standalone resource manager
// service URL. It is an optional extension method (not part of the Client
// interface) mainly intended for integration tests.
//
// When it returns an empty string, the client will fall back to PD-provided
// resource manager service.
func (c *client) GetResourceManagerServiceURL() string {
if ds := c.inner.getResourceManagerDiscovery(); ds != nil {
return ds.GetServiceURL()
}
return ""
}

// GetTSOServiceDiscovery returns the TSO service discovery object. Only used for testing.
func (c *client) GetTSOServiceDiscovery() sd.ServiceDiscovery {
return c.inner.tsoSvcDiscovery
Expand All @@ -451,7 +466,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
if !ok {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
}
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE && enable {
if c.inner.getTSOProvider() != tsoProviderPD && enable {
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
}
c.inner.option.SetEnableTSOFollowerProxy(enable)
Expand Down Expand Up @@ -599,19 +614,17 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
if c.inner.serviceMode == pdpb.ServiceMode_UNKNOWN_SVC_MODE {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
case pdpb.ServiceMode_PD_SVC_MODE:
}

// Handle compatibility issue in case of PD doesn't support GetMinTS API.
if c.inner.getTSOProvider() == tsoProviderPD {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle ServiceMode_UNKNOWN_SVC_MODE?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error like before. I reverted this line 7d73688

// If the service mode is switched to API during GetTS() call, which happens during migration,
// returning the default timeline should be fine.
return c.GetTS(ctx)
case pdpb.ServiceMode_API_SVC_MODE:
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
// Call GetMinTS API to get the minimal TS from the API leader.
Expand Down
104 changes: 76 additions & 28 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
Expand Down Expand Up @@ -79,7 +81,6 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
}
c.wg.Add(1)
go c.routerClientInitializer()

return nil
}

Expand Down Expand Up @@ -141,40 +142,22 @@ func (c *innerClient) disableRouterClient() {
func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if c.option.UseTSOServerProxy {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}
if newMode == c.serviceMode {
return
}
log.Info("[pd] changing TSO provider",
zap.String("old", convertToString(c.serviceMode)),
zap.String("new", convertToString(newMode)))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.resetResourceManagerDiscoveryLocked(newMode)
c.serviceMode = newMode
log.Info("[pd] TSO provider changed",
zap.String("old", convertToString(oldMode)),
zap.String("new", convertToString(newMode)))
}

func convertToString(mode pdpb.ServiceMode) string {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
return "pd"
case pdpb.ServiceMode_API_SVC_MODE:
return "tso server"
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return "unknown"
default:
return "invalid"
}
log.Info("[pd] service mode changed", zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
// `UseTSOServerProxy` is intended to force using PD as the TSO provider,
// but should not block other components (e.g. RM) from switching service mode.
if c.option.UseTSOServerProxy {
mode = pdpb.ServiceMode_PD_SVC_MODE
}
// Re-create a new TSO client.
var (
newTSOCli *tso.Cli
Expand All @@ -184,6 +167,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = tso.NewClient(c.ctx, c.option,
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
log.Info("[pd] tso provider changed to pd")
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
c.ctx, c, c.serviceDiscovery,
Expand All @@ -198,6 +182,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
zap.Error(err))
return
}
log.Info("[pd] tso provider changed to tso server")
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
log.Warn("[pd] intend to switch to unknown service mode, just return")
return
Expand All @@ -219,6 +204,29 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
}
}

func (c *innerClient) resetResourceManagerDiscoveryLocked(mode pdpb.ServiceMode) {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
if c.resourceManagerDiscovery != nil {
c.resourceManagerDiscovery.Close()
c.resourceManagerDiscovery = nil
}
case pdpb.ServiceMode_API_SVC_MODE:
c.resourceManagerDiscovery = sd.NewResourceManagerDiscovery(
c.ctx, c.serviceDiscovery.GetClusterID(), c, c.tlsCfg, c.option, c.scheduleUpdateTokenConnection)
c.resourceManagerDiscovery.Init()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
log.Warn("[pd] intend to switch to unknown service mode, just return")
return
}
}

func (c *innerClient) getResourceManagerDiscovery() *sd.ResourceManagerDiscovery {
c.RLock()
defer c.RUnlock()
return c.resourceManagerDiscovery
}

func (c *innerClient) scheduleUpdateTokenConnection(string) error {
select {
case c.updateTokenConnectionCh <- struct{}{}:
Expand All @@ -227,10 +235,20 @@ func (c *innerClient) scheduleUpdateTokenConnection(string) error {
return nil
}

func (c *innerClient) getServiceMode() pdpb.ServiceMode {
type tsoProvider int

const (
tsoProviderPD tsoProvider = iota
tsoProviderTSOServer
)

func (c *innerClient) getTSOProvider() tsoProvider {
c.RLock()
defer c.RUnlock()
return c.serviceMode
if c.tsoSvcDiscovery != nil {
return tsoProviderTSOServer
}
return tsoProviderPD
}

func (c *innerClient) getTSOClient() *tso.Cli {
Expand Down Expand Up @@ -297,6 +315,36 @@ func (c *innerClient) gRPCErrorHandler(err error) {
}
}

func shouldUpdateRMURL(err error) bool {
if err == nil {
return false
}
if errs.IsLeaderChange(err) {
return true
}
if s, ok := status.FromError(err); ok {
// If the rm instance stops, we can reset the connection and
// use pd instance url instead of it.
if errs.IsNetworkError(s.Code()) || s.Code() == codes.Canceled {
return true
}
}
return false
}

func (c *innerClient) resourceManagerErrorHandler(err error) {
if !shouldUpdateRMURL(err) {
return
}

c.RLock()
defer c.RUnlock()
log.Warn("[resource-manager] resource manager error", zap.Error(err))
if c.resourceManagerDiscovery != nil {
c.resourceManagerDiscovery.ScheduleUpdateServiceURL()
}
}

func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL())
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions client/pkg/retry/interval_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"time"

"github.com/pingcap/errors"
)

var (
// QueryRetryMaxTimes is the max retry times for querying microservice.
queryRetryMaxTimes = 10
// queryRetryInterval is the retry interval for querying microservice.
queryRetryInterval = 500 * time.Millisecond
)

// WithConfig retries the given function with a fixed interval.
func WithConfig(
ctx context.Context, f func() error,
) error {
return Retry(ctx, queryRetryMaxTimes, queryRetryInterval, f)
}

// Retry retries the given function with a fixed interval.
func Retry(
ctx context.Context, maxTimes int, interval time.Duration, f func() error,
) error {
var err error
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range maxTimes {
if err = f(); err == nil {
return nil
}
select {
case <-ctx.Done():
return err
case <-ticker.C:
}
}
return errors.WithStack(err)
}
18 changes: 14 additions & 4 deletions client/resource_group/controller/global_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type ResourceGroupsController struct {

run struct {
responseDeadline *time.Timer
inDegradedMode bool
inDegradedMode atomic.Bool
// currentRequests is used to record the request and resource group.
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
currentRequests []*rmpb.TokenBucketRequest
Expand All @@ -175,6 +175,8 @@ type ResourceGroupsController struct {
safeRuConfig atomic.Pointer[RUConfig]

degradedRUSettings *rmpb.GroupRequestUnitSettings

wg sync.WaitGroup
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
Expand Down Expand Up @@ -248,7 +250,9 @@ const (
// Start starts ResourceGroupController service.
func (c *ResourceGroupsController) Start(ctx context.Context) {
c.loopCtx, c.loopCancel = context.WithCancel(ctx)
c.wg.Add(1)
go func() {
defer c.wg.Done()
if c.ruConfig.DegradedModeWaitDuration > 0 {
c.run.responseDeadline = time.NewTimer(c.ruConfig.DegradedModeWaitDuration)
c.run.responseDeadline.Stop()
Expand Down Expand Up @@ -335,7 +339,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
metrics.ResourceGroupStatusGauge.Reset()
return
case <-c.responseDeadlineCh:
c.run.inDegradedMode = true
c.run.inDegradedMode.Store(true)
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
log.Warn("[resource group controller] enter degraded mode")
case resp := <-c.tokenResponseChan:
Expand All @@ -348,7 +352,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
if c.run.inDegradedMode {
if c.IsDegraded() {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
case resp, ok := <-watchMetaChannel:
Expand Down Expand Up @@ -449,6 +453,7 @@ func (c *ResourceGroupsController) Stop() error {
return errors.Errorf("resource groups controller does not start")
}
c.loopCancel()
c.wg.Wait()
return nil
}

Expand Down Expand Up @@ -607,7 +612,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
}
c.responseDeadlineCh = nil
}
c.run.inDegradedMode = false
c.run.inDegradedMode.Store(false)
for _, res := range resp {
name := res.GetResourceGroupName()
gc, ok := c.loadGroupController(name)
Expand Down Expand Up @@ -760,3 +765,8 @@ func (c *ResourceGroupsController) ReportConsumption(resourceGroupName string, c

gc.addRUConsumption(consumption)
}

// IsDegraded returns whether the controller is in degraded mode.
func (c *ResourceGroupsController) IsDegraded() bool {
return c.run.inDegradedMode.Load()
}
Loading