Skip to content

Commit 3d57628

Browse files
committed
checker: support force merge ranges for physical table drop (#196)
Signed-off-by: HunDunDM <hundundm@gmail.com>
1 parent 82d1924 commit 3d57628

File tree

16 files changed

+1208
-3
lines changed

16 files changed

+1208
-3
lines changed

errors.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,16 @@ error = '''
381381
failed to convert a path to absolute path
382382
'''
383383

384+
["PD:forcemerge:ErrForceMergeRangeContent"]
385+
error = '''
386+
invalid force merge range content, %s
387+
'''
388+
389+
["PD:forcemerge:ErrLoadForceMergeRange"]
390+
error = '''
391+
load force merge range failed
392+
'''
393+
384394
["PD:gin:ErrBindJSON"]
385395
error = '''
386396
bind JSON error

pkg/errs/errno.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ var (
113113
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
114114
)
115115

116+
// force merge errors
117+
var (
118+
ErrForceMergeRangeContent = errors.Normalize("invalid force merge range content, %s", errors.RFCCodeText("PD:forcemerge:ErrForceMergeRangeContent"))
119+
ErrLoadForceMergeRange = errors.Normalize("load force merge range failed", errors.RFCCodeText("PD:forcemerge:ErrLoadForceMergeRange"))
120+
)
121+
116122
// region label errors
117123
var (
118124
ErrRegionRuleContent = errors.Normalize("invalid region rule content, %s", errors.RFCCodeText("PD:region:ErrRegionRuleContent"))

pkg/mock/mockcluster/mockcluster.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/tikv/pd/server/core/storelimit"
3333
"github.com/tikv/pd/server/id"
3434
"github.com/tikv/pd/server/schedule/labeler"
35+
"github.com/tikv/pd/server/schedule/pkdbforcemerge"
3536
"github.com/tikv/pd/server/schedule/placement"
3637
"github.com/tikv/pd/server/statistics"
3738
"github.com/tikv/pd/server/statistics/buckets"
@@ -49,6 +50,7 @@ type Cluster struct {
4950
*core.BasicCluster
5051
*mockid.IDAllocator
5152
*placement.RuleManager
53+
ForceMergeManager *pkdbforcemerge.Manager
5254
*labeler.RegionLabeler
5355
*statistics.HotStat
5456
*config.PersistOptions
@@ -61,6 +63,7 @@ type Cluster struct {
6163

6264
// NewCluster creates a new Cluster
6365
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
66+
store := storage.NewStorageWithMemoryBackend()
6467
clus := &Cluster{
6568
BasicCluster: core.NewBasicCluster(),
6669
IDAllocator: mockid.NewIDAllocator(),
@@ -76,7 +79,8 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
7679
}
7780
// It should be updated to the latest feature version.
7881
clus.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
79-
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, storage.NewStorageWithMemoryBackend(), time.Second*5)
82+
clus.ForceMergeManager, _ = pkdbforcemerge.NewManager(store)
83+
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, store, time.Second*5)
8084
return clus
8185
}
8286

@@ -200,6 +204,11 @@ func (mc *Cluster) GetRuleManager() *placement.RuleManager {
200204
return mc.RuleManager
201205
}
202206

207+
// GetForceMergeManager returns the force merge manager of the cluster.
208+
func (mc *Cluster) GetForceMergeManager() *pkdbforcemerge.Manager {
209+
return mc.ForceMergeManager
210+
}
211+
203212
// GetRegionLabeler returns the region labeler of the cluster.
204213
func (mc *Cluster) GetRegionLabeler() *labeler.RegionLabeler {
205214
return mc.RegionLabeler

server/api/region.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/pingcap/kvproto/pkg/replication_modepb"
3434
"github.com/pingcap/log"
3535
"github.com/tikv/pd/pkg/apiutil"
36+
"github.com/tikv/pd/pkg/errs"
3637
"github.com/tikv/pd/pkg/typeutil"
3738
"github.com/tikv/pd/server"
3839
"github.com/tikv/pd/server/core"
@@ -231,6 +232,11 @@ type regionHandler struct {
231232
rd *render.Render
232233
}
233234

235+
type addForceMergeRangesRequest struct {
236+
StartKeysHex []string `json:"start_keys"`
237+
EndKeysHex []string `json:"end_keys"`
238+
}
239+
234240
func newRegionHandler(svr *server.Server, rd *render.Render) *regionHandler {
235241
return &regionHandler{
236242
svr: svr,
@@ -941,6 +947,66 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter
941947
h.rd.Text(w, http.StatusOK, msgBuilder.String())
942948
}
943949

950+
// @Tags region
951+
// @Summary Add force merge ranges.
952+
// @Accept json
953+
// @Param body body object true "json params"
954+
// @Produce plain
955+
// @Success 200 {string} string "Add force merge ranges successfully."
956+
// @Failure 400 {string} string "The input is invalid."
957+
// @Failure 500 {string} string "PD server failed to proceed the request."
958+
// @Router /regions/force-merge [post]
959+
func (h *regionsHandler) AddForceMergeRanges(w http.ResponseWriter, r *http.Request) {
960+
rc := getCluster(r)
961+
var input addForceMergeRangesRequest
962+
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
963+
return
964+
}
965+
if rc.GetOpts().IsCrossTableMergeEnabled() {
966+
h.rd.JSON(w, http.StatusBadRequest, "enable-cross-table-merge is true")
967+
return
968+
}
969+
if keyType := rc.GetOpts().GetKeyType(); keyType != core.Table {
970+
h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("key-type %s does not support force merge", keyType.String()))
971+
return
972+
}
973+
974+
manager := rc.GetForceMergeManager()
975+
if manager == nil {
976+
h.rd.JSON(w, http.StatusInternalServerError, "force merge manager is not initialized")
977+
return
978+
}
979+
if err := manager.AddRanges(input.StartKeysHex, input.EndKeysHex); err != nil {
980+
if errs.ErrForceMergeRangeContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) {
981+
h.rd.JSON(w, http.StatusBadRequest, err.Error())
982+
} else {
983+
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
984+
}
985+
return
986+
}
987+
h.rd.Text(w, http.StatusOK, "Add force merge ranges successfully.")
988+
}
989+
990+
// @Tags region
991+
// @Summary Clear force merge ranges.
992+
// @Produce plain
993+
// @Success 200 {string} string "Clear force merge ranges successfully."
994+
// @Failure 500 {string} string "PD server failed to proceed the request."
995+
// @Router /regions/force-merge [delete]
996+
func (h *regionsHandler) DeleteForceMergeRanges(w http.ResponseWriter, r *http.Request) {
997+
rc := getCluster(r)
998+
manager := rc.GetForceMergeManager()
999+
if manager == nil {
1000+
h.rd.JSON(w, http.StatusInternalServerError, "force merge manager is not initialized")
1001+
return
1002+
}
1003+
if err := manager.ClearRanges(); err != nil {
1004+
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
1005+
return
1006+
}
1007+
h.rd.Text(w, http.StatusOK, "Clear force merge ranges successfully.")
1008+
}
1009+
9441010
func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) {
9451011
rc := getCluster(r)
9461012
limit := defaultRegionLimit

server/api/region_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,166 @@ func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() {
374374
suite.Len(idList, 4)
375375
}
376376

377+
func (suite *regionTestSuite) TestAddForceMergeRanges() {
378+
re := suite.Require()
379+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
380+
defer func() {
381+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
382+
}()
383+
384+
manager := suite.svr.GetRaftCluster().GetForceMergeManager()
385+
re.NotNil(manager)
386+
387+
startKey := []byte("force-merge/a")
388+
middleKey := []byte("force-merge/b")
389+
endKey := []byte("force-merge/c")
390+
cleanupEndKey := []byte("force-merge/d")
391+
body, err := json.Marshal(addForceMergeRangesRequest{
392+
StartKeysHex: []string{
393+
hex.EncodeToString(startKey),
394+
hex.EncodeToString(middleKey),
395+
},
396+
EndKeysHex: []string{
397+
hex.EncodeToString(middleKey),
398+
hex.EncodeToString(endKey),
399+
},
400+
})
401+
suite.NoError(err)
402+
403+
err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix), body, tu.StatusOK(re))
404+
suite.NoError(err)
405+
406+
exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10001, StartKey: startKey, EndKey: endKey}, nil)
407+
suite.True(manager.SolveRegion(exactRegion))
408+
409+
cleanupRegion := core.NewRegionInfo(&metapb.Region{Id: 10002, StartKey: startKey, EndKey: cleanupEndKey}, nil)
410+
suite.False(manager.SolveRegion(cleanupRegion))
411+
suite.False(manager.SolveRegion(exactRegion))
412+
}
413+
414+
func (suite *regionTestSuite) TestAddForceMergeRangesValidation() {
415+
re := suite.Require()
416+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
417+
defer func() {
418+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
419+
}()
420+
421+
startKey := []byte("force-merge/x")
422+
endKey := []byte("force-merge/y")
423+
body, err := json.Marshal(addForceMergeRangesRequest{
424+
StartKeysHex: []string{hex.EncodeToString(startKey)},
425+
})
426+
suite.NoError(err)
427+
428+
err = tu.CheckPostJSON(
429+
testDialClient,
430+
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
431+
body,
432+
tu.Status(re, http.StatusBadRequest),
433+
tu.StringContain(re, "start key count 1 does not match end key count 0"),
434+
)
435+
suite.NoError(err)
436+
437+
exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10003, StartKey: startKey, EndKey: endKey}, nil)
438+
suite.False(suite.svr.GetRaftCluster().GetForceMergeManager().SolveRegion(exactRegion))
439+
}
440+
441+
func (suite *regionTestSuite) TestAddForceMergeRangesCrossTableMergeEnabled() {
442+
re := suite.Require()
443+
startKey := []byte("force-merge/enable/x")
444+
endKey := []byte("force-merge/enable/y")
445+
body, err := json.Marshal(addForceMergeRangesRequest{
446+
StartKeysHex: []string{hex.EncodeToString(startKey)},
447+
EndKeysHex: []string{hex.EncodeToString(endKey)},
448+
})
449+
suite.NoError(err)
450+
451+
err = tu.CheckPostJSON(
452+
testDialClient,
453+
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
454+
body,
455+
tu.Status(re, http.StatusBadRequest),
456+
tu.StringContain(re, "enable-cross-table-merge is true"),
457+
)
458+
suite.NoError(err)
459+
460+
exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10004, StartKey: startKey, EndKey: endKey}, nil)
461+
suite.False(suite.svr.GetRaftCluster().GetForceMergeManager().SolveRegion(exactRegion))
462+
}
463+
464+
func (suite *regionTestSuite) TestAddForceMergeRangesNonTableKeyType() {
465+
re := suite.Require()
466+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
467+
defer func() {
468+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
469+
}()
470+
471+
originalCfg := suite.svr.GetConfig().PDServerCfg
472+
rawCfg := originalCfg
473+
rawCfg.KeyType = core.Raw.String()
474+
suite.NoError(suite.svr.SetPDServerConfig(rawCfg))
475+
defer func() {
476+
suite.NoError(suite.svr.SetPDServerConfig(originalCfg))
477+
}()
478+
479+
startKey := []byte("force-merge/raw/x")
480+
endKey := []byte("force-merge/raw/y")
481+
body, err := json.Marshal(addForceMergeRangesRequest{
482+
StartKeysHex: []string{hex.EncodeToString(startKey)},
483+
EndKeysHex: []string{hex.EncodeToString(endKey)},
484+
})
485+
suite.NoError(err)
486+
487+
err = tu.CheckPostJSON(
488+
testDialClient,
489+
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
490+
body,
491+
tu.Status(re, http.StatusBadRequest),
492+
tu.StringContain(re, "key-type raw does not support force merge"),
493+
)
494+
suite.NoError(err)
495+
496+
exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10005, StartKey: startKey, EndKey: endKey}, nil)
497+
suite.False(suite.svr.GetRaftCluster().GetForceMergeManager().SolveRegion(exactRegion))
498+
}
499+
500+
func (suite *regionTestSuite) TestDeleteForceMergeRanges() {
501+
re := suite.Require()
502+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
503+
defer func() {
504+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
505+
}()
506+
507+
startKey := []byte("force-merge/delete/x")
508+
endKey := []byte("force-merge/delete/y")
509+
body, err := json.Marshal(addForceMergeRangesRequest{
510+
StartKeysHex: []string{hex.EncodeToString(startKey)},
511+
EndKeysHex: []string{hex.EncodeToString(endKey)},
512+
})
513+
suite.NoError(err)
514+
suite.NoError(tu.CheckPostJSON(
515+
testDialClient,
516+
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
517+
body,
518+
tu.StatusOK(re),
519+
))
520+
521+
manager := suite.svr.GetRaftCluster().GetForceMergeManager()
522+
exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10006, StartKey: startKey, EndKey: endKey}, nil)
523+
suite.True(manager.SolveRegion(exactRegion))
524+
525+
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
526+
527+
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix), nil)
528+
suite.NoError(err)
529+
resp, err := testDialClient.Do(req)
530+
suite.NoError(err)
531+
defer resp.Body.Close()
532+
suite.Equal(http.StatusOK, resp.StatusCode)
533+
534+
suite.False(manager.SolveRegion(exactRegion))
535+
}
536+
377537
func (suite *regionTestSuite) TestScatterRegions() {
378538
re := suite.Require()
379539
r1 := newTestRegionInfo(601, 13, []byte("b1"), []byte("b2"))

server/api/router.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
268268
registerFunc(clusterRouter, "/regions/sibling/{id}", regionsHandler.GetRegionSiblings, setMethods(http.MethodGet), setAuditBackend(prometheus))
269269
registerFunc(clusterRouter, "/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
270270
registerFunc(clusterRouter, "/regions/accelerate-schedule/batch", regionsHandler.AccelerateRegionsScheduleInRanges, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
271+
registerFunc(clusterRouter, "/regions/force-merge", regionsHandler.AddForceMergeRanges, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
272+
registerFunc(clusterRouter, "/regions/force-merge", regionsHandler.DeleteForceMergeRanges, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
271273
registerFunc(clusterRouter, "/regions/scatter", regionsHandler.ScatterRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
272274
registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
273275
registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus))

server/cluster/cluster.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/tikv/pd/server/schedule/checker"
5050
"github.com/tikv/pd/server/schedule/hbstream"
5151
"github.com/tikv/pd/server/schedule/labeler"
52+
"github.com/tikv/pd/server/schedule/pkdbforcemerge"
5253
"github.com/tikv/pd/server/schedule/placement"
5354
"github.com/tikv/pd/server/schedulers"
5455
"github.com/tikv/pd/server/statistics"
@@ -143,6 +144,7 @@ type RaftCluster struct {
143144
hotStat *statistics.HotStat
144145
hotBuckets *buckets.HotBucketCache
145146
ruleManager *placement.RuleManager
147+
forceMergeManager *pkdbforcemerge.Manager
146148
regionLabeler *labeler.RegionLabeler
147149
replicationMode *replication.ModeManager
148150
unsafeRecoveryController *unsafeRecoveryController
@@ -264,6 +266,10 @@ func (c *RaftCluster) Start(s Server) error {
264266
return err
265267
}
266268
}
269+
c.forceMergeManager, err = pkdbforcemerge.NewManager(c.storage)
270+
if err != nil {
271+
return err
272+
}
267273

268274
c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
269275
if err != nil {
@@ -646,6 +652,11 @@ func (c *RaftCluster) GetRuleManager() *placement.RuleManager {
646652
return c.ruleManager
647653
}
648654

655+
// GetForceMergeManager returns the force merge manager reference.
656+
func (c *RaftCluster) GetForceMergeManager() *pkdbforcemerge.Manager {
657+
return c.forceMergeManager
658+
}
659+
649660
// GetRegionLabeler returns the region labeler.
650661
func (c *RaftCluster) GetRegionLabeler() *labeler.RegionLabeler {
651662
return c.regionLabeler

0 commit comments

Comments
 (0)