Skip to content

Commit 9411339

Browse files
disksingokJiang
authored andcommitted
client: update resource manager serivce url after error (#436)
Signed-off-by: disksing <i@disksing.com>
1 parent d9ebeea commit 9411339

File tree

3 files changed

+53
-59
lines changed

3 files changed

+53
-59
lines changed

client/inner_client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,13 @@ func (c *innerClient) gRPCErrorHandler(err error) {
339339
}
340340
}
341341

342+
func (c *innerClient) resourceManagerErrorHandler(err error) {
343+
log.Error("[resource-manager] resource manager error", zap.Error(err))
344+
if c.resourceManagerDiscovery != nil {
345+
c.resourceManagerDiscovery.ScheduleUpateServiceURL()
346+
}
347+
}
348+
342349
func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
343350
cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL())
344351
if err != nil {

client/resource_manager_client.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup
111111
resp, err := cc.ListResourceGroups(ctx, req)
112112
if err != nil {
113113
c.inner.gRPCErrorHandler(err)
114+
c.inner.resourceManagerErrorHandler(err)
114115
return nil, errs.ErrClientListResourceGroup.FastGenByArgs(err.Error())
115116
}
116117
resErr := resp.GetError()
@@ -140,6 +141,7 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string,
140141
resp, err := cc.GetResourceGroup(ctx, req)
141142
if err != nil {
142143
c.inner.gRPCErrorHandler(err)
144+
c.inner.resourceManagerErrorHandler(err)
143145
return nil, &errs.ErrClientGetResourceGroup{ResourceGroupName: resourceGroupName, Cause: err.Error()}
144146
}
145147
resErr := resp.GetError()
@@ -185,6 +187,7 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG
185187
}
186188
if err != nil {
187189
c.inner.gRPCErrorHandler(err)
190+
c.inner.resourceManagerErrorHandler(err)
188191
return "", err
189192
}
190193
resErr := resp.GetError()
@@ -209,6 +212,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
209212
resp, err := cc.DeleteResourceGroup(ctx, req)
210213
if err != nil {
211214
c.inner.gRPCErrorHandler(err)
215+
c.inner.resourceManagerErrorHandler(err)
212216
return "", err
213217
}
214218
resErr := resp.GetError()
@@ -391,6 +395,7 @@ func (c *innerClient) processTokenRequests(stream rmpb.ResourceManager_AcquireTo
391395
resp, err := stream.Recv()
392396
if err != nil {
393397
c.gRPCErrorHandler(err)
398+
c.resourceManagerErrorHandler(err)
394399
err = errors.WithStack(err)
395400
t.done <- err
396401
return err
@@ -423,6 +428,9 @@ func (c *innerClient) tryResourceManagerConnect(ctx context.Context, connection
423428
connection.stream = stream
424429
return nil
425430
}
431+
if err != nil {
432+
c.resourceManagerErrorHandler(err)
433+
}
426434
cancel()
427435
select {
428436
case <-ctx.Done():

client/servicediscovery/resource_manager_service_discovery.go

Lines changed: 38 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"go.uber.org/zap"
2626
"google.golang.org/grpc"
2727

28-
"github.com/pingcap/kvproto/pkg/meta_storagepb"
2928
"github.com/pingcap/kvproto/pkg/resource_manager"
3029
"github.com/pingcap/log"
3130

@@ -53,8 +52,9 @@ type ResourceManagerDiscovery struct {
5352
discoveryKey string
5453
tlsCfg *tls.Config
5554
// Client option.
56-
option *opt.Option
57-
onLeaderChanged func(string) error
55+
option *opt.Option
56+
onLeaderChanged func(string) error
57+
updateServiceURLCh chan struct{}
5858

5959
mu sync.RWMutex
6060
serviceURL string
@@ -65,14 +65,15 @@ type ResourceManagerDiscovery struct {
6565
func NewResourceManagerDiscovery(ctx context.Context, clusterID uint64, metaCli metastorage.Client, tlsCfg *tls.Config, opt *opt.Option, leaderChangedCb func(string) error) *ResourceManagerDiscovery {
6666
ctx, cancel := context.WithCancel(ctx)
6767
d := &ResourceManagerDiscovery{
68-
ctx: ctx,
69-
cancel: cancel,
70-
clusterID: clusterID,
71-
metaCli: metaCli,
72-
discoveryKey: fmt.Sprintf(resourceManagerSvcDiscoveryFormat, clusterID),
73-
tlsCfg: tlsCfg,
74-
option: opt,
75-
onLeaderChanged: leaderChangedCb,
68+
ctx: ctx,
69+
cancel: cancel,
70+
clusterID: clusterID,
71+
metaCli: metaCli,
72+
discoveryKey: fmt.Sprintf(resourceManagerSvcDiscoveryFormat, clusterID),
73+
tlsCfg: tlsCfg,
74+
option: opt,
75+
onLeaderChanged: leaderChangedCb,
76+
updateServiceURLCh: make(chan struct{}, 1),
7677
}
7778

7879
log.Info("[resource-manager] created resource manager discovery",
@@ -105,7 +106,7 @@ func (r *ResourceManagerDiscovery) Init() error {
105106
}
106107
r.resetConn(url)
107108
r.wg.Add(1)
108-
go r.watchServiceURL(revision)
109+
go r.updateServiceURLLoop(revision)
109110
return nil
110111
}
111112

@@ -180,60 +181,38 @@ func (r *ResourceManagerDiscovery) parseURLFromStorageValue(value []byte) (strin
180181
return listenUrls[0], nil
181182
}
182183

183-
func (r *ResourceManagerDiscovery) watchServiceURL(revision int64) {
184-
defer r.wg.Done()
184+
// ScheduleUpateServiceURL schedules an update of the service URL.
185+
func (r *ResourceManagerDiscovery) ScheduleUpateServiceURL() {
186+
select {
187+
case r.updateServiceURLCh <- struct{}{}:
188+
default:
189+
}
190+
}
185191

186-
log.Info("[resource-manager] watching service URL",
187-
zap.String("discovery-key", r.discoveryKey))
192+
func (r *ResourceManagerDiscovery) updateServiceURLLoop(revision int64) {
193+
defer r.wg.Done()
188194

189-
lastRevision := revision
190-
start_watch:
191-
ch, err := r.metaCli.Watch(r.ctx, []byte(r.discoveryKey), opt.WithRev(lastRevision))
192-
if err != nil {
193-
log.Error("[resource-manager] failed to watch service URL",
194-
zap.String("discovery-key", r.discoveryKey),
195-
zap.Error(err))
196-
select {
197-
case <-r.ctx.Done():
198-
return
199-
case <-time.After(serviceURLWatchRetryInterval):
200-
goto start_watch
201-
}
202-
}
203195
for {
204196
select {
205197
case <-r.ctx.Done():
198+
log.Info("[resource-manager] exit update service URL loop due to context canceled")
206199
return
207-
case events, ok := <-ch:
208-
if !ok {
209-
log.Info("[resource-manager] service URL watch channel closed",
210-
zap.String("discovery-key", r.discoveryKey))
211-
goto start_watch
212-
}
213-
var connReset bool
214-
for _, event := range events {
215-
if event.Type == meta_storagepb.Event_PUT {
216-
url, err := r.parseURLFromStorageValue(event.Kv.Value)
217-
if err != nil {
218-
log.Error("[resource-manager] failed to parse service URL",
219-
zap.String("discovery-key", r.discoveryKey),
220-
zap.Error(err))
221-
continue
222-
}
223-
log.Info("[resource-manager] service URL changed",
224-
zap.String("discovery-key", r.discoveryKey),
225-
zap.String("new-url", url))
226-
lastRevision = event.Kv.ModRevision
227-
r.resetConn(url)
228-
connReset = true
229-
}
200+
case <-r.updateServiceURLCh:
201+
log.Info("[resource-manager] updating service URL", zap.String("old-url", r.serviceURL))
202+
url, newRevision, err := r.discoverServiceURL()
203+
if err != nil {
204+
log.Error("[resource-manager] failed to discover service URL",
205+
zap.String("discovery-key", r.discoveryKey),
206+
zap.Error(err))
207+
continue
230208
}
231-
if connReset && r.onLeaderChanged != nil {
232-
if err := r.onLeaderChanged(""); err != nil {
233-
log.Error("[resource-manager] failed to notify leader change",
234-
zap.String("discovery-key", r.discoveryKey),
235-
zap.Error(err))
236-
}
209+
log.Info("[resource-manager] updated service URL",
210+
zap.String("new-url", url),
211+
zap.Int64("new-revision", newRevision),
212+
zap.Int64("revision", revision))
213+
if newRevision > revision {
214+
r.resetConn(url)
215+
revision = newRevision
237216
}
238217
}
239218
}

0 commit comments

Comments
 (0)