Skip to content

Commit eacfa79

Browse files
authored
Merge branch 'release-6.5' into cherry-pick-6123-to-release-6.5
2 parents afe7da6 + 206d412 commit eacfa79

File tree

5 files changed

+193
-121
lines changed

5 files changed

+193
-121
lines changed

server/cluster/unsafe_recovery_controller.go

Lines changed: 128 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,22 @@ func (u *unsafeRecoveryController) reset() {
161161
func (u *unsafeRecoveryController) IsRunning() bool {
162162
u.RLock()
163163
defer u.RUnlock()
164+
return u.isRunningLocked()
165+
}
166+
167+
func (u *unsafeRecoveryController) isRunningLocked() bool {
164168
return u.stage != idle && u.stage != finished && u.stage != failed
165169
}
166170

167171
// RemoveFailedStores removes failed stores from the cluster.
168172
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
169-
if u.IsRunning() {
170-
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
171-
}
172173
u.Lock()
173174
defer u.Unlock()
174175

176+
if u.isRunningLocked() {
177+
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
178+
}
179+
175180
if !autoDetect {
176181
if len(failedStores) == 0 {
177182
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
@@ -220,7 +225,9 @@ func (u *unsafeRecoveryController) Show() []StageOutput {
220225
if u.stage == idle {
221226
return []StageOutput{{Info: "No on-going recovery."}}
222227
}
223-
u.checkTimeout()
228+
if err := u.checkTimeout(); err != nil {
229+
u.HandleErr(err)
230+
}
224231
status := u.output
225232
if u.stage != finished && u.stage != failed {
226233
status = append(status, u.getReportStatus())
@@ -255,17 +262,15 @@ func (u *unsafeRecoveryController) getReportStatus() StageOutput {
255262
return status
256263
}
257264

258-
func (u *unsafeRecoveryController) checkTimeout() bool {
265+
func (u *unsafeRecoveryController) checkTimeout() error {
259266
if u.stage == finished || u.stage == failed {
260-
return false
267+
return nil
261268
}
262269

263270
if time.Now().After(u.timeout) {
264-
ret := u.HandleErr(errors.Errorf("Exceeds timeout %v", u.timeout))
265-
u.timeout = time.Now().Add(storeRequestInterval * 2)
266-
return ret
271+
return errors.Errorf("Exceeds timeout %v", u.timeout)
267272
}
268-
return false
273+
return nil
269274
}
270275

271276
func (u *unsafeRecoveryController) HandleErr(err error) bool {
@@ -274,128 +279,139 @@ func (u *unsafeRecoveryController) HandleErr(err error) bool {
274279
u.err = err
275280
}
276281
if u.stage == exitForceLeader {
282+
// We already tried to exit force leader, and it still failed.
283+
// We turn into failed stage directly. TiKV will step down force leader
284+
// automatically after being for a long time.
277285
u.changeStage(failed)
278286
return true
279287
}
288+
// When encountering an error for the first time, we will try to exit force
289+
// leader before turning into failed stage to avoid the leaking force leaders
290+
// blocks reads and writes.
291+
u.storePlanExpires = make(map[uint64]time.Time)
292+
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
293+
u.timeout = time.Now().Add(storeRequestInterval * 2)
294+
// empty recovery plan would trigger exit force leader
295+
u.changeStage(exitForceLeader)
280296
return false
281297
}
282298

283299
// HandleStoreHeartbeat handles the store heartbeat requests and checks whether the stores need to
284300
// send detailed report back.
285301
func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) {
286-
if !u.IsRunning() {
287-
// no recovery in progress, do nothing
288-
return
289-
}
290302
u.Lock()
291303
defer u.Unlock()
292304

293-
if u.checkTimeout() {
305+
if !u.isRunningLocked() {
306+
// no recovery in progress, do nothing
294307
return
295308
}
296309

297-
allCollected := u.collectReport(heartbeat)
310+
done, err := func() (bool, error) {
311+
if err := u.checkTimeout(); err != nil {
312+
return false, err
313+
}
298314

299-
if allCollected {
300-
newestRegionTree, peersMap, buildErr := u.buildUpFromReports()
301-
if buildErr != nil && u.HandleErr(buildErr) {
302-
return
315+
allCollected, err := u.collectReport(heartbeat)
316+
if err != nil {
317+
return false, err
303318
}
304319

305-
// clean up previous plan
306-
u.storePlanExpires = make(map[uint64]time.Time)
307-
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
320+
if allCollected {
321+
newestRegionTree, peersMap, err := u.buildUpFromReports()
322+
if err != nil {
323+
return false, err
324+
}
308325

309-
var stage unsafeRecoveryStage
310-
if u.err == nil {
311-
stage = u.stage
312-
} else {
313-
stage = exitForceLeader
326+
return u.generatePlan(newestRegionTree, peersMap)
314327
}
315-
reCheck := false
316-
hasPlan := false
317-
var err error
318-
for {
319-
switch stage {
320-
case collectReport:
321-
fallthrough
322-
case tombstoneTiFlashLearner:
323-
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
324-
u.changeStage(tombstoneTiFlashLearner)
325-
break
326-
}
327-
if err != nil {
328-
break
329-
}
330-
fallthrough
331-
case forceLeaderForCommitMerge:
332-
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
333-
u.changeStage(forceLeaderForCommitMerge)
334-
break
335-
}
336-
if err != nil {
337-
break
338-
}
339-
fallthrough
340-
case forceLeader:
341-
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
342-
u.changeStage(forceLeader)
343-
break
344-
}
345-
if err != nil {
346-
break
347-
}
348-
fallthrough
349-
case demoteFailedVoter:
350-
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
351-
u.changeStage(demoteFailedVoter)
352-
break
353-
} else if !reCheck {
354-
reCheck = true
355-
stage = tombstoneTiFlashLearner
356-
continue
357-
}
358-
fallthrough
359-
case createEmptyRegion:
360-
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
361-
u.changeStage(createEmptyRegion)
362-
break
363-
}
364-
if err != nil {
365-
break
366-
}
367-
fallthrough
368-
case exitForceLeader:
369-
// no need to generate plan, empty recovery plan triggers exit force leader on TiKV side
370-
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
371-
u.changeStage(exitForceLeader)
372-
}
373-
default:
374-
panic("unreachable")
375-
}
328+
return false, nil
329+
}()
376330

331+
if done || (err != nil && u.HandleErr(err)) {
332+
return
333+
}
334+
u.dispatchPlan(heartbeat, resp)
335+
}
336+
337+
func (u *unsafeRecoveryController) generatePlan(newestRegionTree *regionTree, peersMap map[uint64][]*regionItem) (bool, error) {
338+
// clean up previous plan
339+
u.storePlanExpires = make(map[uint64]time.Time)
340+
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
341+
342+
stage := u.stage
343+
reCheck := false
344+
hasPlan := false
345+
var err error
346+
for {
347+
switch stage {
348+
case collectReport:
349+
fallthrough
350+
case tombstoneTiFlashLearner:
351+
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
352+
u.changeStage(tombstoneTiFlashLearner)
353+
break
354+
}
377355
if err != nil {
378-
if u.HandleErr(err) {
379-
return
380-
}
381-
u.storePlanExpires = make(map[uint64]time.Time)
382-
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
383-
// Clear the reports etc.
356+
break
357+
}
358+
fallthrough
359+
case forceLeaderForCommitMerge:
360+
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
361+
u.changeStage(forceLeaderForCommitMerge)
362+
break
363+
}
364+
if err != nil {
365+
break
366+
}
367+
fallthrough
368+
case forceLeader:
369+
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
370+
u.changeStage(forceLeader)
371+
break
372+
}
373+
if err != nil {
374+
break
375+
}
376+
fallthrough
377+
case demoteFailedVoter:
378+
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
379+
u.changeStage(demoteFailedVoter)
380+
break
381+
} else if !reCheck {
382+
reCheck = true
383+
stage = tombstoneTiFlashLearner
384+
continue
385+
}
386+
fallthrough
387+
case createEmptyRegion:
388+
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
389+
u.changeStage(createEmptyRegion)
390+
break
391+
}
392+
if err != nil {
393+
break
394+
}
395+
fallthrough
396+
case exitForceLeader:
397+
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
384398
u.changeStage(exitForceLeader)
385-
return
386-
} else if !hasPlan {
387-
if u.err != nil {
388-
u.changeStage(failed)
389-
} else {
390-
u.changeStage(finished)
391-
}
392-
return
393399
}
394-
break
400+
default:
401+
panic("unreachable")
395402
}
403+
break
396404
}
397405

398-
u.dispatchPlan(heartbeat, resp)
406+
if err == nil && !hasPlan {
407+
if u.err != nil {
408+
u.changeStage(failed)
409+
} else {
410+
u.changeStage(finished)
411+
}
412+
return true, nil
413+
}
414+
return false, err
399415
}
400416

401417
// It dispatches recovery plan if any.
@@ -421,22 +437,21 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe
421437
}
422438

423439
// It collects and checks if store reports have been fully collected.
424-
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) bool {
440+
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) (bool, error) {
425441
storeID := heartbeat.Stats.StoreId
426442
if _, isFailedStore := u.failedStores[storeID]; isFailedStore {
427-
u.HandleErr(errors.Errorf("Receive heartbeat from failed store %d", storeID))
428-
return false
443+
return false, errors.Errorf("Receive heartbeat from failed store %d", storeID)
429444
}
430445

431446
if heartbeat.StoreReport == nil {
432-
return false
447+
return false, nil
433448
}
434449

435450
if heartbeat.StoreReport.GetStep() != u.step {
436451
log.Info("Unsafe recovery receives invalid store report",
437452
zap.Uint64("store-id", storeID), zap.Uint64("expected-step", u.step), zap.Uint64("obtained-step", heartbeat.StoreReport.GetStep()))
438453
// invalid store report, ignore
439-
return false
454+
return false, nil
440455
}
441456

442457
if report, exists := u.storeReports[storeID]; exists {
@@ -445,11 +460,11 @@ func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatR
445460
if report == nil {
446461
u.numStoresReported++
447462
if u.numStoresReported == len(u.storeReports) {
448-
return true
463+
return true, nil
449464
}
450465
}
451466
}
452-
return false
467+
return false, nil
453468
}
454469

455470
// Gets the stage of the current unsafe recovery.
@@ -1204,6 +1219,7 @@ func (u *unsafeRecoveryController) generateExitForceLeaderPlan() bool {
12041219
for storeID, storeReport := range u.storeReports {
12051220
for _, peerReport := range storeReport.PeerReports {
12061221
if peerReport.IsForceLeader {
1222+
// empty recovery plan triggers exit force leader on TiKV side
12071223
_ = u.getRecoveryPlan(storeID)
12081224
hasPlan = true
12091225
break

0 commit comments

Comments
 (0)