Skip to content

Commit fb0182c

Browse files
AmoebaProtozoabufferflies
authored andcommitted
PD support etcd groups (#375) (#376)
* PD support etcd groups (#375) * init commit Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fix comment Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * lint Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * lint' Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * lint Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * rename Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * add pd-ctl support Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> --------- Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fix new import path Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fix test package path Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * lint Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * lint Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * lint Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> --------- Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> (cherry picked from commit 45bc455)
1 parent 048f0d8 commit fb0182c

22 files changed

+1153
-22
lines changed

pkg/gc/gc_state_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions)
131131
})
132132
ctx, cancel := context.WithCancel(context.Background())
133133
kgm := keyspace.NewKeyspaceGroupManager(ctx, s, client)
134-
keyspaceManager := keyspace.NewKeyspaceManager(ctx, s, mockcluster.NewCluster(ctx, config.NewPersistOptions(cfg)), allocator, &config.KeyspaceConfig{}, kgm)
134+
keyspaceManager := keyspace.NewKeyspaceManager(ctx, s, mockcluster.NewCluster(ctx, config.NewPersistOptions(cfg)), allocator, &config.KeyspaceConfig{}, kgm, nil)
135135
gcStateManager = NewGCStateManager(s.GetGCStateProvider(), cfg.PDServerCfg, keyspaceManager)
136136

137137
err = kgm.Bootstrap(ctx)

pkg/keyspace/keyspace.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ const (
6868
// UnifiedGC is a type of gc_management_type used to indicate that the GC states of this keyspace is managed
6969
// in a unified way (managed by the NullKeyspace).
7070
UnifiedGC = "unified"
71+
// MetaServiceGroupIDKey is the key for meta-service group id in keyspace config.
72+
MetaServiceGroupIDKey = "meta_service_group_id"
73+
// MetaServiceGroupAddressesKey is the key for meta-service group addresses in keyspace config.
74+
MetaServiceGroupAddressesKey = "meta_service_group_addrs"
7175
)
7276

7377
// Config is the interface for keyspace config.
@@ -76,6 +80,8 @@ type Config interface {
7680
ToWaitRegionSplit() bool
7781
GetWaitRegionSplitTimeout() time.Duration
7882
GetCheckRegionSplitInterval() time.Duration
83+
SetMetaServiceGroups(map[string]string)
84+
GetMetaServiceGroups() map[string]string
7985
}
8086

8187
// Manager manages keyspace related data.
@@ -95,6 +101,8 @@ type Manager struct {
95101
config Config
96102
// kgm is the keyspace group manager of the server.
97103
kgm *GroupManager
104+
// mgm is the meta-service group manager of the server.
105+
mgm *MetaServiceGroupManager
98106
// nextPatrolStartID is the next start id of keyspace assignment patrol.
99107
nextPatrolStartID uint32
100108
// cached keyspace meta info for each keyspace ID.
@@ -133,6 +141,7 @@ func NewKeyspaceManager(
133141
idAllocator id.Allocator,
134142
config Config,
135143
kgm *GroupManager,
144+
mgm *MetaServiceGroupManager,
136145
) *Manager {
137146
return &Manager{
138147
ctx: ctx,
@@ -147,6 +156,7 @@ func NewKeyspaceManager(
147156
cluster: cluster,
148157
config: config,
149158
kgm: kgm,
159+
mgm: mgm,
150160
nextPatrolStartID: constant.StartKeyspaceID,
151161
}
152162
}
@@ -223,6 +233,9 @@ func (manager *Manager) initReserveKeyspace(id uint32, name string) error {
223233
// UpdateConfig update keyspace manager's config.
224234
func (manager *Manager) UpdateConfig(cfg Config) {
225235
manager.config = cfg
236+
if manager.mgm != nil {
237+
manager.mgm.updateGroups(cfg.GetMetaServiceGroups())
238+
}
226239
}
227240

228241
// CreateKeyspace create a keyspace meta with given config and save it to storage.
@@ -270,6 +283,17 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
270283
request.Config[UserKindKey] = config[UserKindKey]
271284
}
272285
}
286+
assignToMetaServiceGroup := manager.mgm != nil && len(manager.mgm.GetGroups()) > 0
287+
if assignToMetaServiceGroup {
288+
metaServiceGroup, err := manager.mgm.AssignToGroup(1)
289+
if err != nil {
290+
return nil, err
291+
}
292+
if request.Config == nil {
293+
request.Config = make(map[string]string)
294+
}
295+
request.Config[MetaServiceGroupIDKey] = metaServiceGroup
296+
}
273297
// Set default value of GCManagementType to KeyspaceLevelGC for NextGen
274298
if kerneltype.IsNextGen() {
275299
if request.Config == nil {
@@ -341,6 +365,9 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
341365
if err := manager.kgm.UpdateKeyspaceForGroup(userKind, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil {
342366
return nil, err
343367
}
368+
if assignToMetaServiceGroup {
369+
manager.mgm.AttachEndpoints(keyspace.GetConfig())
370+
}
344371
tracer.OnUpdateKeyspaceGroupFinished()
345372
tracer.OnCreateKeyspaceComplete()
346373

@@ -409,6 +436,17 @@ func (manager *Manager) CreateKeyspaceByID(request *CreateKeyspaceByIDRequest) (
409436
request.Config[GCManagementType] = KeyspaceLevelGC
410437
}
411438
}
439+
assignToMetaServiceGroup := manager.mgm != nil && len(manager.mgm.GetGroups()) > 0
440+
if assignToMetaServiceGroup {
441+
metaServiceGroup, err := manager.mgm.AssignToGroup(1)
442+
if err != nil {
443+
return nil, err
444+
}
445+
if request.Config == nil {
446+
request.Config = make(map[string]string)
447+
}
448+
request.Config[MetaServiceGroupIDKey] = metaServiceGroup
449+
}
412450
// Create a disabled keyspace meta for tikv-server to get the config on keyspace split.
413451
keyspace := &keyspacepb.KeyspaceMeta{
414452
Id: id,
@@ -457,6 +495,9 @@ func (manager *Manager) CreateKeyspaceByID(request *CreateKeyspaceByIDRequest) (
457495
if err := manager.kgm.UpdateKeyspaceForGroup(userKind, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil {
458496
return nil, err
459497
}
498+
if assignToMetaServiceGroup {
499+
manager.mgm.AttachEndpoints(keyspace.GetConfig())
500+
}
460501
log.Info("[keyspace] keyspace created",
461502
zap.Uint32("keyspace-id", keyspace.GetId()),
462503
zap.String("keyspace-name", keyspace.GetName()),
@@ -616,6 +657,9 @@ func (manager *Manager) LoadKeyspace(name string) (*keyspacepb.KeyspaceMeta, err
616657
}
617658
return nil
618659
})
660+
if manager.mgm != nil && meta != nil {
661+
manager.mgm.AttachEndpoints(meta.GetConfig())
662+
}
619663
return meta, err
620664
}
621665

@@ -636,6 +680,9 @@ func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMe
636680
}
637681
return nil
638682
})
683+
if manager.mgm != nil && meta != nil {
684+
manager.mgm.AttachEndpoints(meta.GetConfig())
685+
}
639686
return meta, err
640687
}
641688

@@ -766,6 +813,13 @@ func (manager *Manager) updateKeyspaceConfigTxn(name string, update func(meta *k
766813
return err
767814
}
768815
}
816+
oldMetaServiceGroup := oldConfig[MetaServiceGroupIDKey]
817+
newMetaServiceGroup := newConfig[MetaServiceGroupIDKey]
818+
if manager.mgm != nil && oldMetaServiceGroup != newMetaServiceGroup {
819+
if err := manager.mgm.UpdateAssignment(oldMetaServiceGroup, newMetaServiceGroup); err != nil {
820+
return err
821+
}
822+
}
769823
// Save the updated keyspace meta.
770824
if err := manager.store.SaveKeyspaceMeta(txn, meta); err != nil {
771825
if needUpdate {
@@ -785,6 +839,9 @@ func (manager *Manager) updateKeyspaceConfigTxn(name string, update func(meta *k
785839
)
786840
return nil, err
787841
}
842+
if manager.mgm != nil {
843+
manager.mgm.AttachEndpoints(meta.GetConfig())
844+
}
788845
log.Info("[keyspace] keyspace config updated",
789846
zap.Uint32("keyspace-id", meta.GetId()),
790847
zap.String("name", meta.GetName()),
@@ -922,6 +979,13 @@ func (manager *Manager) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspac
922979
if err != nil {
923980
return nil, err
924981
}
982+
if manager.mgm != nil {
983+
for _, meta := range keyspaces {
984+
if meta != nil {
985+
manager.mgm.AttachEndpoints(meta.GetConfig())
986+
}
987+
}
988+
}
925989
return keyspaces, nil
926990
}
927991

pkg/keyspace/keyspace_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type mockConfig struct {
6868
WaitRegionSplit bool
6969
WaitRegionSplitTimeout typeutil.Duration
7070
CheckRegionSplitInterval typeutil.Duration
71+
MetaServiceGroups map[string]string
7172
}
7273

7374
func (m *mockConfig) GetPreAlloc() []string {
@@ -86,13 +87,21 @@ func (m *mockConfig) GetCheckRegionSplitInterval() time.Duration {
8687
return m.CheckRegionSplitInterval.Duration
8788
}
8889

90+
func (m *mockConfig) SetMetaServiceGroups(metaServiceGroups map[string]string) {
91+
m.MetaServiceGroups = metaServiceGroups
92+
}
93+
94+
func (m *mockConfig) GetMetaServiceGroups() map[string]string {
95+
return m.MetaServiceGroups
96+
}
97+
8998
func (suite *keyspaceTestSuite) SetupTest() {
9099
re := suite.Require()
91100
suite.ctx, suite.cancel = context.WithCancel(context.Background())
92101
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
93102
allocator := mockid.NewIDAllocator()
94103
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil)
95-
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
104+
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm, nil)
96105
re.NoError(kgm.Bootstrap(suite.ctx))
97106
re.NoError(suite.manager.Bootstrap())
98107
}
@@ -947,7 +956,7 @@ func TestIterateKeyspaces(t *testing.T) {
947956
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
948957
allocator := mockid.NewIDAllocator()
949958
kgm := NewKeyspaceGroupManager(ctx, store, nil)
950-
manager := NewKeyspaceManager(ctx, store, nil, allocator, &mockConfig{}, kgm)
959+
manager := NewKeyspaceManager(ctx, store, nil, allocator, &mockConfig{}, kgm, nil)
951960

952961
re.NoError(kgm.Bootstrap(ctx))
953962
re.NoError(manager.Bootstrap())

pkg/keyspace/meta_service_group.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package keyspace
16+
17+
import (
18+
"context"
19+
"math"
20+
21+
"github.com/tikv/pd/pkg/storage/endpoint"
22+
"github.com/tikv/pd/pkg/storage/kv"
23+
"github.com/tikv/pd/pkg/utils/syncutil"
24+
)
25+
26+
// MetaServiceGroupManager manages external meta-service groups.
27+
type MetaServiceGroupManager struct {
28+
ctx context.Context
29+
store endpoint.MetaServiceGroupStorage
30+
syncutil.RWMutex
31+
metaServiceGroups map[string]string
32+
}
33+
34+
// NewMetaServiceGroupManager creates a new MetaServiceGroupManager.
35+
func NewMetaServiceGroupManager(
36+
ctx context.Context,
37+
store endpoint.MetaServiceGroupStorage,
38+
metaServiceGroups map[string]string,
39+
) *MetaServiceGroupManager {
40+
return &MetaServiceGroupManager{
41+
ctx: ctx,
42+
store: store,
43+
metaServiceGroups: metaServiceGroups,
44+
}
45+
}
46+
47+
// GetAssignmentCounts returns the count of each meta-service group.
48+
func (m *MetaServiceGroupManager) GetAssignmentCounts() (map[string]int, error) {
49+
m.RLock()
50+
defer m.RUnlock()
51+
var (
52+
err error
53+
count map[string]int
54+
)
55+
err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
56+
count, err = m.store.GetAssignmentCount(txn, m.metaServiceGroups)
57+
if err != nil {
58+
return nil
59+
}
60+
return nil
61+
})
62+
return count, err
63+
}
64+
65+
// AssignToGroup increments count of the meta-service group with least assigned keyspaces.
66+
// It returns the assigned meta-service group and an error if any.
67+
func (m *MetaServiceGroupManager) AssignToGroup(count int) (string, error) {
68+
m.RLock()
69+
defer m.RUnlock()
70+
var assignedGroup string
71+
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
72+
countMap, err := m.store.GetAssignmentCount(txn, m.metaServiceGroups)
73+
if err != nil {
74+
return err
75+
}
76+
minCount := math.MaxInt
77+
for currentGroup, currentCount := range countMap {
78+
if currentCount < minCount {
79+
minCount = currentCount
80+
assignedGroup = currentGroup
81+
}
82+
}
83+
if assignedGroup == "" {
84+
return errNoAvailableMetaServiceGroups
85+
}
86+
return m.store.IncrementAssignmentCount(txn, assignedGroup, count)
87+
}); err != nil {
88+
return "", err
89+
}
90+
return assignedGroup, nil
91+
}
92+
93+
// UpdateAssignment moves a keyspace from one meta-service group to another.
94+
// It returns an error if any.
95+
func (m *MetaServiceGroupManager) UpdateAssignment(oldGroupID, newGroupID string) error {
96+
m.RLock()
97+
defer m.RUnlock()
98+
// Newly assigned meta-service group must be available.
99+
if newGroupID != "" && m.metaServiceGroups[newGroupID] == "" {
100+
return errUnknownMetaServiceGroup
101+
}
102+
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
103+
if oldGroupID != "" {
104+
if err := m.store.IncrementAssignmentCount(txn, oldGroupID, -1); err != nil {
105+
return err
106+
}
107+
}
108+
if newGroupID != "" {
109+
if err := m.store.IncrementAssignmentCount(txn, newGroupID, 1); err != nil {
110+
return err
111+
}
112+
}
113+
return nil
114+
})
115+
}
116+
117+
// AttachEndpoints append potential meta-service group endpoint to the given keyspace config map.
118+
func (m *MetaServiceGroupManager) AttachEndpoints(keyspaceConfig map[string]string) {
119+
groupID := keyspaceConfig[MetaServiceGroupIDKey]
120+
if groupID == "" {
121+
return
122+
}
123+
m.RLock()
124+
defer m.RUnlock()
125+
if endpoints := m.metaServiceGroups[groupID]; endpoints != "" {
126+
keyspaceConfig[MetaServiceGroupAddressesKey] = endpoints
127+
}
128+
}
129+
130+
// GetGroups returns currently available meta-service groups.
131+
func (m *MetaServiceGroupManager) GetGroups() map[string]string {
132+
m.RLock()
133+
defer m.RUnlock()
134+
return m.metaServiceGroups
135+
}
136+
137+
// updateGroups updates currently available meta-service groups.
138+
func (m *MetaServiceGroupManager) updateGroups(metaServiceGroups map[string]string) {
139+
m.Lock()
140+
defer m.Unlock()
141+
m.metaServiceGroups = metaServiceGroups
142+
}

0 commit comments

Comments
 (0)