Skip to content

Commit 757acdf

Browse files
authored
client: resource manager service discovery (#9785)
ref #9737 Signed-off-by: okjiang <819421878@qq.com>
1 parent 2145768 commit 757acdf

File tree

10 files changed

+894
-125
lines changed

10 files changed

+894
-125
lines changed

client/client.go

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -150,23 +150,25 @@ var _ Client = (*client)(nil)
150150
// serviceModeKeeper is for service mode switching.
151151
type serviceModeKeeper struct {
152152
sync.RWMutex
153-
serviceMode pdpb.ServiceMode
154-
tsoClient *tso.Cli
155-
tsoSvcDiscovery sd.ServiceDiscovery
156-
routerClient *router.Cli
153+
serviceMode pdpb.ServiceMode
154+
tsoClient *tso.Cli
155+
tsoSvcDiscovery sd.ServiceDiscovery
156+
routerClient *router.Cli
157+
resourceManagerDiscovery *sd.ResourceManagerDiscovery
157158
}
158159

159160
func (k *serviceModeKeeper) close() {
160161
k.Lock()
161162
defer k.Unlock()
162-
switch k.serviceMode {
163-
case pdpb.ServiceMode_API_SVC_MODE:
163+
if k.tsoSvcDiscovery != nil {
164164
k.tsoSvcDiscovery.Close()
165-
fallthrough
166-
case pdpb.ServiceMode_PD_SVC_MODE:
167-
k.tsoClient.Close()
168-
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
165+
k.tsoSvcDiscovery = nil
169166
}
167+
if k.resourceManagerDiscovery != nil {
168+
k.resourceManagerDiscovery.Close()
169+
k.resourceManagerDiscovery = nil
170+
}
171+
k.tsoClient.Close()
170172
}
171173

172174
type client struct {
@@ -430,6 +432,19 @@ func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
430432
return c.inner.serviceDiscovery
431433
}
432434

435+
// GetResourceManagerServiceURL returns the discovered standalone resource manager
436+
// service URL. It is an optional extension method (not part of the Client
437+
// interface) mainly intended for integration tests.
438+
//
439+
// When it returns an empty string, the client will fall back to PD-provided
440+
// resource manager service.
441+
func (c *client) GetResourceManagerServiceURL() string {
442+
if ds := c.inner.getResourceManagerDiscovery(); ds != nil {
443+
return ds.GetServiceURL()
444+
}
445+
return ""
446+
}
447+
433448
// GetTSOServiceDiscovery returns the TSO service discovery object. Only used for testing.
434449
func (c *client) GetTSOServiceDiscovery() sd.ServiceDiscovery {
435450
return c.inner.tsoSvcDiscovery
@@ -451,7 +466,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
451466
if !ok {
452467
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
453468
}
454-
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE && enable {
469+
if c.inner.getTSOProvider() != tsoProviderPD && enable {
455470
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
456471
}
457472
c.inner.option.SetEnableTSOFollowerProxy(enable)
@@ -599,19 +614,17 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi
599614

600615
// GetMinTS implements the TSOClient interface.
601616
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
602-
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
603-
serviceMode := c.inner.getServiceMode()
604-
switch serviceMode {
605-
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
617+
if c.inner.serviceMode == pdpb.ServiceMode_UNKNOWN_SVC_MODE {
606618
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
607-
case pdpb.ServiceMode_PD_SVC_MODE:
619+
}
620+
621+
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
622+
if c.inner.getTSOProvider() == tsoProviderPD {
608623
// If the service mode is switched to API during GetTS() call, which happens during migration,
609624
// returning the default timeline should be fine.
610625
return c.GetTS(ctx)
611-
case pdpb.ServiceMode_API_SVC_MODE:
612-
default:
613-
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
614626
}
627+
615628
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
616629
defer cancel()
617630
// Call GetMinTS API to get the minimal TS from the API leader.

client/inner_client.go

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222

2323
"go.uber.org/zap"
2424
"google.golang.org/grpc"
25+
"google.golang.org/grpc/codes"
26+
"google.golang.org/grpc/status"
2527

2628
"github.com/pingcap/errors"
2729
"github.com/pingcap/kvproto/pkg/keyspacepb"
@@ -79,7 +81,6 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
7981
}
8082
c.wg.Add(1)
8183
go c.routerClientInitializer()
82-
8384
return nil
8485
}
8586

@@ -141,40 +142,22 @@ func (c *innerClient) disableRouterClient() {
141142
func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
142143
c.Lock()
143144
defer c.Unlock()
144-
145-
if c.option.UseTSOServerProxy {
146-
// If we are using TSO server proxy, we always use PD_SVC_MODE.
147-
newMode = pdpb.ServiceMode_PD_SVC_MODE
148-
}
149145
if newMode == c.serviceMode {
150146
return
151147
}
152-
log.Info("[pd] changing TSO provider",
153-
zap.String("old", convertToString(c.serviceMode)),
154-
zap.String("new", convertToString(newMode)))
155148
c.resetTSOClientLocked(newMode)
156-
oldMode := c.serviceMode
149+
c.resetResourceManagerDiscoveryLocked(newMode)
157150
c.serviceMode = newMode
158-
log.Info("[pd] TSO provider changed",
159-
zap.String("old", convertToString(oldMode)),
160-
zap.String("new", convertToString(newMode)))
161-
}
162-
163-
func convertToString(mode pdpb.ServiceMode) string {
164-
switch mode {
165-
case pdpb.ServiceMode_PD_SVC_MODE:
166-
return "pd"
167-
case pdpb.ServiceMode_API_SVC_MODE:
168-
return "tso server"
169-
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
170-
return "unknown"
171-
default:
172-
return "invalid"
173-
}
151+
log.Info("[pd] service mode changed", zap.String("new-mode", newMode.String()))
174152
}
175153

176154
// Reset a new TSO client.
177155
func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
156+
// `UseTSOServerProxy` is intended to force using PD as the TSO provider,
157+
// but should not block other components (e.g. RM) from switching service mode.
158+
if c.option.UseTSOServerProxy {
159+
mode = pdpb.ServiceMode_PD_SVC_MODE
160+
}
178161
// Re-create a new TSO client.
179162
var (
180163
newTSOCli *tso.Cli
@@ -184,6 +167,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
184167
case pdpb.ServiceMode_PD_SVC_MODE:
185168
newTSOCli = tso.NewClient(c.ctx, c.option,
186169
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
170+
log.Info("[pd] tso provider changed to pd")
187171
case pdpb.ServiceMode_API_SVC_MODE:
188172
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
189173
c.ctx, c, c.serviceDiscovery,
@@ -198,6 +182,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
198182
zap.Error(err))
199183
return
200184
}
185+
log.Info("[pd] tso provider changed to tso server")
201186
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
202187
log.Warn("[pd] intend to switch to unknown service mode, just return")
203188
return
@@ -219,6 +204,29 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
219204
}
220205
}
221206

207+
func (c *innerClient) resetResourceManagerDiscoveryLocked(mode pdpb.ServiceMode) {
208+
switch mode {
209+
case pdpb.ServiceMode_PD_SVC_MODE:
210+
if c.resourceManagerDiscovery != nil {
211+
c.resourceManagerDiscovery.Close()
212+
c.resourceManagerDiscovery = nil
213+
}
214+
case pdpb.ServiceMode_API_SVC_MODE:
215+
c.resourceManagerDiscovery = sd.NewResourceManagerDiscovery(
216+
c.ctx, c.serviceDiscovery.GetClusterID(), c, c.tlsCfg, c.option, c.scheduleUpdateTokenConnection)
217+
c.resourceManagerDiscovery.Init()
218+
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
219+
log.Warn("[pd] intend to switch to unknown service mode, just return")
220+
return
221+
}
222+
}
223+
224+
func (c *innerClient) getResourceManagerDiscovery() *sd.ResourceManagerDiscovery {
225+
c.RLock()
226+
defer c.RUnlock()
227+
return c.resourceManagerDiscovery
228+
}
229+
222230
func (c *innerClient) scheduleUpdateTokenConnection(string) error {
223231
select {
224232
case c.updateTokenConnectionCh <- struct{}{}:
@@ -227,10 +235,20 @@ func (c *innerClient) scheduleUpdateTokenConnection(string) error {
227235
return nil
228236
}
229237

230-
func (c *innerClient) getServiceMode() pdpb.ServiceMode {
238+
type tsoProvider int
239+
240+
const (
241+
tsoProviderPD tsoProvider = iota
242+
tsoProviderTSOServer
243+
)
244+
245+
func (c *innerClient) getTSOProvider() tsoProvider {
231246
c.RLock()
232247
defer c.RUnlock()
233-
return c.serviceMode
248+
if c.tsoSvcDiscovery != nil {
249+
return tsoProviderTSOServer
250+
}
251+
return tsoProviderPD
234252
}
235253

236254
func (c *innerClient) getTSOClient() *tso.Cli {
@@ -297,6 +315,36 @@ func (c *innerClient) gRPCErrorHandler(err error) {
297315
}
298316
}
299317

318+
func shouldUpdateRMURL(err error) bool {
319+
if err == nil {
320+
return false
321+
}
322+
if errs.IsLeaderChange(err) {
323+
return true
324+
}
325+
if s, ok := status.FromError(err); ok {
326+
// If the rm instance stops, we can reset the connection and
327+
// use pd instance url instead of it.
328+
if errs.IsNetworkError(s.Code()) || s.Code() == codes.Canceled {
329+
return true
330+
}
331+
}
332+
return false
333+
}
334+
335+
func (c *innerClient) resourceManagerErrorHandler(err error) {
336+
if !shouldUpdateRMURL(err) {
337+
return
338+
}
339+
340+
c.RLock()
341+
defer c.RUnlock()
342+
log.Warn("[resource-manager] resource manager error", zap.Error(err))
343+
if c.resourceManagerDiscovery != nil {
344+
c.resourceManagerDiscovery.ScheduleUpdateServiceURL()
345+
}
346+
}
347+
300348
func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
301349
cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL())
302350
if err != nil {

client/pkg/retry/interval_retry.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package retry
16+
17+
import (
18+
"context"
19+
"time"
20+
21+
"github.com/pingcap/errors"
22+
)
23+
24+
var (
25+
// QueryRetryMaxTimes is the max retry times for querying microservice.
26+
queryRetryMaxTimes = 10
27+
// queryRetryInterval is the retry interval for querying microservice.
28+
queryRetryInterval = 500 * time.Millisecond
29+
)
30+
31+
// WithConfig retries the given function with a fixed interval.
32+
func WithConfig(
33+
ctx context.Context, f func() error,
34+
) error {
35+
return Retry(ctx, queryRetryMaxTimes, queryRetryInterval, f)
36+
}
37+
38+
// Retry retries the given function with a fixed interval.
39+
func Retry(
40+
ctx context.Context, maxTimes int, interval time.Duration, f func() error,
41+
) error {
42+
var err error
43+
ticker := time.NewTicker(interval)
44+
defer ticker.Stop()
45+
for range maxTimes {
46+
if err = f(); err == nil {
47+
return nil
48+
}
49+
select {
50+
case <-ctx.Done():
51+
return err
52+
case <-ticker.C:
53+
}
54+
}
55+
return errors.WithStack(err)
56+
}

client/resource_group/controller/global_controller.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ type ResourceGroupsController struct {
163163

164164
run struct {
165165
responseDeadline *time.Timer
166-
inDegradedMode bool
166+
inDegradedMode atomic.Bool
167167
// currentRequests is used to record the request and resource group.
168168
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
169169
currentRequests []*rmpb.TokenBucketRequest
@@ -175,6 +175,8 @@ type ResourceGroupsController struct {
175175
safeRuConfig atomic.Pointer[RUConfig]
176176

177177
degradedRUSettings *rmpb.GroupRequestUnitSettings
178+
179+
wg sync.WaitGroup
178180
}
179181

180182
// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
@@ -248,7 +250,9 @@ const (
248250
// Start starts ResourceGroupController service.
249251
func (c *ResourceGroupsController) Start(ctx context.Context) {
250252
c.loopCtx, c.loopCancel = context.WithCancel(ctx)
253+
c.wg.Add(1)
251254
go func() {
255+
defer c.wg.Done()
252256
if c.ruConfig.DegradedModeWaitDuration > 0 {
253257
c.run.responseDeadline = time.NewTimer(c.ruConfig.DegradedModeWaitDuration)
254258
c.run.responseDeadline.Stop()
@@ -335,7 +339,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
335339
metrics.ResourceGroupStatusGauge.Reset()
336340
return
337341
case <-c.responseDeadlineCh:
338-
c.run.inDegradedMode = true
342+
c.run.inDegradedMode.Store(true)
339343
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
340344
log.Warn("[resource group controller] enter degraded mode")
341345
case resp := <-c.tokenResponseChan:
@@ -348,7 +352,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
348352
c.executeOnAllGroups((*groupCostController).updateRunState)
349353
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
350354
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
351-
if c.run.inDegradedMode {
355+
if c.IsDegraded() {
352356
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
353357
}
354358
case resp, ok := <-watchMetaChannel:
@@ -449,6 +453,7 @@ func (c *ResourceGroupsController) Stop() error {
449453
return errors.Errorf("resource groups controller does not start")
450454
}
451455
c.loopCancel()
456+
c.wg.Wait()
452457
return nil
453458
}
454459

@@ -607,7 +612,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
607612
}
608613
c.responseDeadlineCh = nil
609614
}
610-
c.run.inDegradedMode = false
615+
c.run.inDegradedMode.Store(false)
611616
for _, res := range resp {
612617
name := res.GetResourceGroupName()
613618
gc, ok := c.loadGroupController(name)
@@ -760,3 +765,8 @@ func (c *ResourceGroupsController) ReportConsumption(resourceGroupName string, c
760765

761766
gc.addRUConsumption(consumption)
762767
}
768+
769+
// IsDegraded returns whether the controller is in degraded mode.
770+
func (c *ResourceGroupsController) IsDegraded() bool {
771+
return c.run.inDegradedMode.Load()
772+
}

0 commit comments

Comments
 (0)