Skip to content

Commit eadad73

Browse files
disksingokJiang
authored andcommitted
client: resource manager service discovery (#391)
Signed-off-by: disksing <i@disksing.com>
1 parent 9450168 commit eadad73

File tree

5 files changed

+356
-51
lines changed

5 files changed

+356
-51
lines changed

client/client.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -150,23 +150,25 @@ var _ Client = (*client)(nil)
150150
// serviceModeKeeper is for service mode switching.
151151
type serviceModeKeeper struct {
152152
sync.RWMutex
153-
serviceMode pdpb.ServiceMode
154-
tsoClient *tso.Cli
155-
tsoSvcDiscovery sd.ServiceDiscovery
156-
routerClient *router.Cli
153+
serviceMode pdpb.ServiceMode
154+
tsoClient *tso.Cli
155+
tsoSvcDiscovery sd.ServiceDiscovery
156+
routerClient *router.Cli
157+
resourceManagerDiscovery *sd.ResourceManagerDiscovery
157158
}
158159

159160
func (k *serviceModeKeeper) close() {
160161
k.Lock()
161162
defer k.Unlock()
162-
switch k.serviceMode {
163-
case pdpb.ServiceMode_API_SVC_MODE:
163+
if k.tsoSvcDiscovery != nil {
164164
k.tsoSvcDiscovery.Close()
165-
fallthrough
166-
case pdpb.ServiceMode_PD_SVC_MODE:
167-
k.tsoClient.Close()
168-
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
165+
k.tsoSvcDiscovery = nil
169166
}
167+
if k.resourceManagerDiscovery != nil {
168+
k.resourceManagerDiscovery.Close()
169+
k.resourceManagerDiscovery = nil
170+
}
171+
k.tsoClient.Close()
170172
}
171173

172174
type client struct {
@@ -451,7 +453,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
451453
if !ok {
452454
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
453455
}
454-
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE && enable {
456+
if c.inner.getTSOProvider() != tsoProviderPD && enable {
455457
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
456458
}
457459
c.inner.option.SetEnableTSOFollowerProxy(enable)
@@ -598,18 +600,12 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi
598600
// GetMinTS implements the TSOClient interface.
599601
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
600602
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
601-
serviceMode := c.inner.getServiceMode()
602-
switch serviceMode {
603-
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
604-
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
605-
case pdpb.ServiceMode_PD_SVC_MODE:
603+
if c.inner.getTSOProvider() == tsoProviderPD {
606604
// If the service mode is switched to API during GetTS() call, which happens during migration,
607605
// returning the default timeline should be fine.
608606
return c.GetTS(ctx)
609-
case pdpb.ServiceMode_API_SVC_MODE:
610-
default:
611-
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
612607
}
608+
613609
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
614610
defer cancel()
615611
// Call GetMinTS API to get the minimal TS from the API leader.

client/inner_client.go

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,35 +140,33 @@ func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
140140
c.Lock()
141141
defer c.Unlock()
142142

143-
if c.option.UseTSOServerProxy {
144-
// If we are using TSO server proxy, we always use PD_SVC_MODE.
145-
newMode = pdpb.ServiceMode_PD_SVC_MODE
146-
}
147143
if newMode == c.serviceMode {
148144
return
149145
}
150-
log.Info("[pd] changing TSO provider",
151-
zap.String("old", convertToString(c.serviceMode)),
152-
zap.String("new", convertToString(newMode)))
153-
c.resetTSOClientLocked(newMode)
154-
oldMode := c.serviceMode
155-
c.serviceMode = newMode
156-
log.Info("[pd] TSO provider changed",
157-
zap.String("old", convertToString(oldMode)),
158-
zap.String("new", convertToString(newMode)))
159-
}
160146

161-
func convertToString(mode pdpb.ServiceMode) string {
162-
switch mode {
147+
switch newMode {
148+
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
149+
log.Warn("[pd] intend to switch to unknown service mode, use PD_SVC_MODE")
150+
fallthrough
163151
case pdpb.ServiceMode_PD_SVC_MODE:
164-
return "pd"
152+
if c.tsoSvcDiscovery != nil || c.tsoClient == nil {
153+
c.resetTSOClientLocked(newMode)
154+
}
155+
if c.resourceManagerDiscovery != nil {
156+
c.resetResourceManagerDiscoveryLocked(newMode)
157+
}
165158
case pdpb.ServiceMode_API_SVC_MODE:
166-
return "tso server"
167-
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
168-
return "unknown"
169-
default:
170-
return "invalid"
159+
// If we are using TSO server proxy, we always use PD_SVC_MODE.
160+
if c.tsoSvcDiscovery == nil && !c.option.UseTSOServerProxy {
161+
c.resetTSOClientLocked(newMode)
162+
}
163+
if c.resourceManagerDiscovery == nil && !c.option.UseResourceManagerProxy {
164+
c.resetResourceManagerDiscoveryLocked(newMode)
165+
}
171166
}
167+
168+
c.serviceMode = newMode
169+
log.Info("[pd] service mode changed", zap.String("new-mode", newMode.String()))
172170
}
173171

174172
// Reset a new TSO client.
@@ -182,6 +180,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
182180
case pdpb.ServiceMode_PD_SVC_MODE:
183181
newTSOCli = tso.NewClient(c.ctx, c.option,
184182
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
183+
log.Info("[pd] tso provider changed to pd")
185184
case pdpb.ServiceMode_API_SVC_MODE:
186185
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
187186
c.ctx, c, c.serviceDiscovery,
@@ -196,6 +195,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
196195
zap.Error(err))
197196
return
198197
}
198+
log.Info("[pd] tso provider changed to tso server")
199199
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
200200
log.Warn("[pd] intend to switch to unknown service mode, just return")
201201
return
@@ -217,6 +217,38 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
217217
}
218218
}
219219

220+
func (c *innerClient) resetResourceManagerDiscoveryLocked(newMode pdpb.ServiceMode) {
221+
var newResourceManagerDiscovery *sd.ResourceManagerDiscovery
222+
if newMode == pdpb.ServiceMode_API_SVC_MODE && !c.option.UseResourceManagerProxy {
223+
newResourceManagerDiscovery = sd.NewResourceManagerDiscovery(
224+
c.ctx, c.serviceDiscovery.GetClusterID(), c, c.tlsCfg, c.option, c.scheduleUpdateTokenConnection)
225+
if err := newResourceManagerDiscovery.Init(); err != nil {
226+
log.Error("[pd] failed to initialize resource manager discovery. keep the current service mode",
227+
zap.Strings("svr-urls", c.svrUrls),
228+
zap.String("current-mode", c.serviceMode.String()),
229+
zap.Error(err))
230+
newResourceManagerDiscovery.Close()
231+
return
232+
}
233+
}
234+
if c.resourceManagerDiscovery != nil {
235+
c.resourceManagerDiscovery.Close()
236+
}
237+
c.resourceManagerDiscovery = newResourceManagerDiscovery
238+
if newMode == pdpb.ServiceMode_PD_SVC_MODE {
239+
log.Info("[pd] resource manager provider changed to pd")
240+
} else {
241+
log.Info("[pd] resource manager provider changed to resource manager server")
242+
}
243+
_ = c.scheduleUpdateTokenConnection("")
244+
}
245+
246+
func (c *innerClient) getResourceManagerDiscovery() *sd.ResourceManagerDiscovery {
247+
c.RLock()
248+
defer c.RUnlock()
249+
return c.resourceManagerDiscovery
250+
}
251+
220252
func (c *innerClient) scheduleUpdateTokenConnection(string) error {
221253
select {
222254
case c.updateTokenConnectionCh <- struct{}{}:
@@ -225,10 +257,20 @@ func (c *innerClient) scheduleUpdateTokenConnection(string) error {
225257
return nil
226258
}
227259

228-
func (c *innerClient) getServiceMode() pdpb.ServiceMode {
260+
type tsoProvider int
261+
262+
const (
263+
tsoProviderPD tsoProvider = iota
264+
tsoProviderTSOServer
265+
)
266+
267+
func (c *innerClient) getTSOProvider() tsoProvider {
229268
c.RLock()
230269
defer c.RUnlock()
231-
return c.serviceMode
270+
if c.tsoSvcDiscovery != nil {
271+
return tsoProviderTSOServer
272+
}
273+
return tsoProviderPD
232274
}
233275

234276
func (c *innerClient) getTSOClient() *tso.Cli {

client/opt/option.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,15 @@ const (
6161
// It provides the ability to change some PD client's options online from the outside.
6262
type Option struct {
6363
// Static options.
64-
GRPCDialOptions []grpc.DialOption
65-
Timeout time.Duration
66-
MaxRetryTimes int
67-
EnableForwarding bool
68-
UseTSOServerProxy bool
69-
MetricsLabels prometheus.Labels
70-
InitMetrics bool
71-
Backoffer *retry.Backoffer
64+
GRPCDialOptions []grpc.DialOption
65+
Timeout time.Duration
66+
MaxRetryTimes int
67+
EnableForwarding bool
68+
UseTSOServerProxy bool
69+
UseResourceManagerProxy bool
70+
MetricsLabels prometheus.Labels
71+
InitMetrics bool
72+
Backoffer *retry.Backoffer
7273

7374
// Dynamic options.
7475
dynamicOptions [dynamicOptionCount]atomic.Value
@@ -87,6 +88,7 @@ func NewOption() *Option {
8788
EnableFollowerHandleCh: make(chan struct{}, 1),
8889
EnableRouterClientCh: make(chan struct{}, 1),
8990
InitMetrics: true,
91+
UseResourceManagerProxy: true,
9092
}
9193

9294
co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
@@ -200,6 +202,16 @@ func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
200202
}
201203
}
202204

205+
// WithResourceManagerProxyOption configures the client to use resource manager proxy,
206+
// i.e., the client will send resource manager requests to the API leader (the resource manager
207+
// proxy) which will forward the requests to the resource manager servers.
208+
// Default is true, which means the client will use resource manager proxy.
209+
func WithResourceManagerProxyOption(useResourceManagerProxy bool) ClientOption {
210+
return func(op *Option) {
211+
op.UseResourceManagerProxy = useResourceManagerProxy
212+
}
213+
}
214+
203215
// WithMaxErrorRetry configures the client max retry times when connect meets error.
204216
func WithMaxErrorRetry(count int) ClientOption {
205217
return func(op *Option) {

client/resource_manager_client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ func WithRUStats(op *GetResourceGroupOp) {
8080

8181
// resourceManagerClient gets the ResourceManager client of current PD leader.
8282
func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) {
83+
if ds := c.getResourceManagerDiscovery(); ds != nil {
84+
return rmpb.NewResourceManagerClient(ds.GetConn()), nil
85+
}
8386
cc, err := c.getOrCreateGRPCConn()
8487
if err != nil {
8588
return nil, err

0 commit comments

Comments
 (0)