Skip to content

Commit 5be7eae

Browse files
add meta-service-group status (tikv#392)
* add enable status Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * Update pkg/keyspace/meta_service_group.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update pkg/keyspace/meta_service_group.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/server/apiv2/handlers/testutil.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix typo Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fix linter issue Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * Update tools/pd-ctl/pdctl/command/meta_service_group_command.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix copilot suggestion Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fix ut Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> --------- Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent ff840e1 commit 5be7eae

File tree

10 files changed

+363
-115
lines changed

10 files changed

+363
-115
lines changed

pkg/keyspace/keyspace_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,21 @@ func (suite *keyspaceTestSuite) SetupTest() {
113113
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
114114
allocator := mockid.NewIDAllocator()
115115
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil)
116-
egm := NewMetaServiceGroupManager(suite.ctx, store, true, mockMetaServiceGroups())
116+
mgm := NewMetaServiceGroupManager(suite.ctx, store, true, mockMetaServiceGroups())
117+
mustEnableMetaServiceGroups(re, mgm, mockMetaServiceGroups())
117118
var err error
118-
suite.manager, err = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm, egm)
119+
suite.manager, err = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm, mgm)
119120
re.NoError(err)
120121
re.NoError(kgm.Bootstrap(suite.ctx))
121122
re.NoError(suite.manager.Bootstrap())
122123
}
123124

125+
func mustEnableMetaServiceGroups(re *require.Assertions, manager *MetaServiceGroupManager, groups map[string]string) {
126+
for groupID := range groups {
127+
enabled := true
128+
re.NoError(manager.PatchStatus(groupID, &MetaServiceGroupStatusPatch{Enabled: &enabled}))
129+
}
130+
}
124131
func (suite *keyspaceTestSuite) TearDownTest() {
125132
suite.cancel()
126133
}

pkg/keyspace/meta_service_group.go

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,45 +48,81 @@ func NewMetaServiceGroupManager(
4848
}
4949

5050
// GetAssignmentCounts returns the count of each meta-service group.
51-
func (m *MetaServiceGroupManager) GetAssignmentCounts() (map[string]int, error) {
51+
func (m *MetaServiceGroupManager) GetStatus() (map[string]*endpoint.MetaServiceGroupStatus, error) {
5252
m.RLock()
5353
defer m.RUnlock()
5454
var (
55-
err error
56-
count map[string]int
55+
err error
56+
statusMap map[string]*endpoint.MetaServiceGroupStatus
5757
)
5858
err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
59-
count, err = m.store.GetAssignmentCount(txn, m.metaServiceGroups)
59+
statusMap, err = m.store.LoadMetaServiceGroupStatus(txn, m.metaServiceGroups)
6060
if err != nil {
61-
return nil
61+
return err
6262
}
6363
return nil
6464
})
65-
return count, err
65+
return statusMap, err
66+
}
67+
68+
// MetaServiceGroupStatusPatch represents a patch operation for a meta-service group.
69+
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
70+
type MetaServiceGroupStatusPatch struct {
71+
AssignedCount *int `json:"assigned_count,omitempty"` // nil means no change, 0 means reset to 0
72+
Enabled *bool `json:"enabled,omitempty"` // nil means no change, true means enable, false means disable
73+
}
74+
75+
// PatchStatus applies a patch to the status of a meta-service group.
76+
func (m *MetaServiceGroupManager) PatchStatus(groupID string, patch *MetaServiceGroupStatusPatch) error {
77+
m.RLock()
78+
defer m.RUnlock()
79+
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
80+
statusMap, err := m.store.LoadMetaServiceGroupStatus(txn, m.metaServiceGroups)
81+
if err != nil {
82+
return err
83+
}
84+
status, exists := statusMap[groupID]
85+
if !exists {
86+
return errUnknownMetaServiceGroup
87+
}
88+
if patch.AssignedCount != nil {
89+
status.AssignmentCount = *patch.AssignedCount
90+
}
91+
if patch.Enabled != nil {
92+
status.Enabled = *patch.Enabled
93+
}
94+
return m.store.SaveMetaServiceGroupStatus(txn, groupID, status)
95+
})
6696
}
6797

68-
// AssignToGroup increments count of the meta-service group with least assigned keyspaces.
98+
// AssignToGroup increments count of the enabled meta-service group with least assigned keyspaces.
6999
// It returns the assigned meta-service group and an error if any.
70100
func (m *MetaServiceGroupManager) AssignToGroup(count int) (string, error) {
71101
m.RLock()
72102
defer m.RUnlock()
73-
var assignedGroup string
103+
var (
104+
assignedGroup string
105+
assignedGroupStatus *endpoint.MetaServiceGroupStatus
106+
)
74107
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
75-
countMap, err := m.store.GetAssignmentCount(txn, m.metaServiceGroups)
108+
statusMap, err := m.store.LoadMetaServiceGroupStatus(txn, m.metaServiceGroups)
76109
if err != nil {
77110
return err
78111
}
79112
minCount := math.MaxInt
80-
for currentGroup, currentCount := range countMap {
81-
if currentCount < minCount {
82-
minCount = currentCount
113+
for currentGroup, currentGroupStatus := range statusMap {
114+
// only consider enabled groups
115+
if currentGroupStatus.Enabled && currentGroupStatus.AssignmentCount < minCount {
116+
minCount = currentGroupStatus.AssignmentCount
83117
assignedGroup = currentGroup
118+
assignedGroupStatus = currentGroupStatus
84119
}
85120
}
86121
if assignedGroup == "" {
87122
return errNoAvailableMetaServiceGroups
88123
}
89-
return m.store.IncrementAssignmentCount(txn, assignedGroup, count)
124+
assignedGroupStatus.AssignmentCount += count
125+
return m.store.SaveMetaServiceGroupStatus(txn, assignedGroup, assignedGroupStatus)
90126
}); err != nil {
91127
return "", err
92128
}
@@ -103,13 +139,19 @@ func (m *MetaServiceGroupManager) UpdateAssignment(oldGroupID, newGroupID string
103139
return errUnknownMetaServiceGroup
104140
}
105141
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
106-
if oldGroupID != "" {
107-
if err := m.store.IncrementAssignmentCount(txn, oldGroupID, -1); err != nil {
142+
statusMap, err := m.store.LoadMetaServiceGroupStatus(txn, m.metaServiceGroups)
143+
if err != nil {
144+
return err
145+
}
146+
if status, exists := statusMap[oldGroupID]; exists {
147+
status.AssignmentCount--
148+
if err := m.store.SaveMetaServiceGroupStatus(txn, oldGroupID, status); err != nil {
108149
return err
109150
}
110151
}
111-
if newGroupID != "" {
112-
if err := m.store.IncrementAssignmentCount(txn, newGroupID, 1); err != nil {
152+
if status, exists := statusMap[newGroupID]; exists {
153+
status.AssignmentCount++
154+
if err := m.store.SaveMetaServiceGroupStatus(txn, newGroupID, status); err != nil {
113155
return err
114156
}
115157
}

pkg/keyspace/meta_service_group_test.go

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,62 +52,78 @@ func (suite *metaServiceGroupTestSuite) TearDownTest() {
5252
suite.cancel()
5353
}
5454

55-
func (suite *metaServiceGroupTestSuite) TestGetAssignmentCountsInitialZero() {
55+
func (suite *metaServiceGroupTestSuite) TestInitialState() {
5656
re := suite.Require()
57-
counts, err := suite.manager.GetAssignmentCounts()
57+
statusMap, err := suite.manager.GetStatus()
5858
re.NoError(err)
59-
60-
for grp := range mockMetaServiceGroups() {
61-
val, exists := counts[grp]
62-
re.True(exists, "expected group %q to be present in counts", grp)
63-
re.Equal(0, val, "expected initial count of %q to be 0", grp)
59+
for id := range mockMetaServiceGroups() {
60+
status, exists := statusMap[id]
61+
re.True(exists)
62+
re.Zero(status.AssignmentCount)
63+
re.False(status.Enabled)
6464
}
65+
// Assign should return error due to no available groups.
66+
_, err = suite.manager.AssignToGroup(1)
67+
re.Error(err)
6568
}
6669

6770
func (suite *metaServiceGroupTestSuite) TestAssignToGroup() {
6871
re := suite.Require()
72+
for groupID := range mockMetaServiceGroups() {
73+
enable := true
74+
re.NoError(suite.manager.PatchStatus(groupID, &MetaServiceGroupStatusPatch{
75+
Enabled: &enable,
76+
}))
77+
}
6978
request := 5
7079
assigned, err := suite.manager.AssignToGroup(request)
7180
re.NoError(err)
72-
re.NotEmpty(assigned, "expected some non-empty group name")
81+
re.NotEmpty(assigned)
7382

7483
// Verify the returned group is one of the mockMetaServiceGroups keys.
7584
_, isValid := mockMetaServiceGroups()[assigned]
76-
re.True(isValid, "assigned group must be from mockMetaServiceGroups")
85+
re.True(isValid)
7786

7887
// Verify the chosen group's count increments by 'request'.
79-
counts, err := suite.manager.GetAssignmentCounts()
88+
statusMap, err := suite.manager.GetStatus()
8089
re.NoError(err)
81-
re.Equal(request, counts[assigned], "chosen group's count should equal the requested increment")
90+
re.Contains(statusMap, assigned)
91+
re.Equal(request, statusMap[assigned].AssignmentCount)
8292

8393
// All other groups must remain at 0.
84-
for grp := range mockMetaServiceGroups() {
85-
if grp == assigned {
94+
for groupID := range mockMetaServiceGroups() {
95+
if groupID == assigned {
8696
continue
8797
}
88-
re.Equal(0, counts[grp], "other groups should remain at 0")
98+
re.Contains(statusMap, groupID)
99+
re.Equal(0, statusMap[groupID].AssignmentCount)
89100
}
90101
}
91102

92103
func (suite *metaServiceGroupTestSuite) TestUpdateAssignment() {
93104
re := suite.Require()
105+
for groupID := range mockMetaServiceGroups() {
106+
enable := true
107+
re.NoError(suite.manager.PatchStatus(groupID, &MetaServiceGroupStatusPatch{
108+
Enabled: &enable,
109+
}))
110+
}
94111
err := suite.manager.UpdateAssignment("", "etcd-group-0")
95112
re.NoError(err)
96-
97-
counts, err := suite.manager.GetAssignmentCounts()
113+
statusMap, err := suite.manager.GetStatus()
98114
re.NoError(err)
99-
re.Equal(1, counts["etcd-group-0"])
100-
re.Equal(0, counts["etcd-group-1"])
101-
re.Equal(0, counts["etcd-group-2"])
115+
re.Equal(1, statusMap["etcd-group-0"].AssignmentCount)
116+
re.Equal(0, statusMap["etcd-group-1"].AssignmentCount)
117+
re.Equal(0, statusMap["etcd-group-2"].AssignmentCount)
102118

103119
err = suite.manager.UpdateAssignment("etcd-group-0", "etcd-group-1")
104120
re.NoError(err)
105121

106-
counts, err = suite.manager.GetAssignmentCounts()
122+
statusMap, err = suite.manager.GetStatus()
107123
re.NoError(err)
108-
re.Equal(0, counts["etcd-group-0"], "expected decremented back to 0")
109-
re.Equal(1, counts["etcd-group-1"], "expected incremented to 1")
110-
re.Equal(0, counts["etcd-group-2"], "unchanged")
124+
re.Equal(0, statusMap["etcd-group-0"].AssignmentCount)
125+
re.Equal(1, statusMap["etcd-group-1"].AssignmentCount)
126+
re.Equal(0, statusMap["etcd-group-2"].AssignmentCount)
111127
}
112128

113129
func (suite *metaServiceGroupTestSuite) TestUpdateAssignmentUnknownNewGroup() {
@@ -157,13 +173,20 @@ func (suite *metaServiceGroupTestSuite) TestUpdateEndpoints() {
157173

158174
func (suite *metaServiceGroupTestSuite) TestUpdateEndpointsAndUpdateAssignment() {
159175
re := suite.Require()
176+
for groupID := range mockMetaServiceGroups() {
177+
enable := true
178+
re.NoError(suite.manager.PatchStatus(groupID, &MetaServiceGroupStatusPatch{
179+
Enabled: &enable,
180+
}))
181+
}
160182
// Assign to some existing group
161183
assigned, err := suite.manager.AssignToGroup(1)
162184
re.NoError(err)
163185
re.NotEmpty(assigned, "expected AssignToGroup to return a non-empty group")
164-
counts, err := suite.manager.GetAssignmentCounts()
186+
statusMap, err := suite.manager.GetStatus()
165187
re.NoError(err)
166-
re.Equal(1, counts[assigned], "assigned group should have count 1")
188+
re.Contains(statusMap, assigned, "assigned group should be in status map")
189+
re.Equal(1, statusMap[assigned].AssignmentCount, "assigned group should have count 1")
167190

168191
// Add a new group "etcd-group-3"
169192
newMap := mockMetaServiceGroups()
@@ -176,16 +199,16 @@ func (suite *metaServiceGroupTestSuite) TestUpdateEndpointsAndUpdateAssignment()
176199

177200
// the original group should have decreased from 1 → 0
178201
// "etcd-group-3" should have increased from 0 → 1
179-
counts, err = suite.manager.GetAssignmentCounts()
202+
statusMap, err = suite.manager.GetStatus()
180203
re.NoError(err)
181-
re.Equal(0, counts[assigned], "original group should have count 0 after moving assignment")
182-
re.Equal(1, counts["etcd-group-3"], "new group should have count 1")
204+
re.Equal(0, statusMap[assigned].AssignmentCount, "original group should have count 0 after moving assignment")
205+
re.Equal(1, statusMap["etcd-group-3"].AssignmentCount, "new group should have count 1")
183206

184207
// All other preexisting groups (besides assigned and etcd-group-3) remain at 0
185-
for grp := range mockMetaServiceGroups() {
186-
if grp == assigned {
208+
for groupID := range mockMetaServiceGroups() {
209+
if groupID == assigned {
187210
continue
188211
}
189-
re.Equal(0, counts[grp], "other original groups should remain at 0")
212+
re.Equal(0, statusMap[groupID].AssignmentCount, "other original groups should remain at 0")
190213
}
191214
}

pkg/storage/endpoint/meta_service_group.go

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,56 +16,61 @@ package endpoint
1616

1717
import (
1818
"context"
19-
"strconv"
19+
"encoding/json"
2020

2121
"github.com/tikv/pd/pkg/storage/kv"
2222
"github.com/tikv/pd/pkg/utils/keypath"
2323
)
2424

2525
// MetaServiceGroupStorage defines storage operations on meta-service group related data.
2626
type MetaServiceGroupStorage interface {
27-
IncrementAssignmentCount(txn kv.Txn, id string, delta int) error
28-
GetAssignmentCount(txn kv.Txn, ids map[string]string) (map[string]int, error)
27+
SaveMetaServiceGroupStatus(txn kv.Txn, id string, status *MetaServiceGroupStatus) error
28+
LoadMetaServiceGroupStatus(txn kv.Txn, ids map[string]string) (map[string]*MetaServiceGroupStatus, error)
2929
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
3030
}
3131

32-
// IncrementAssignmentCount increments the assignment count of the designated meta-service group by delta
33-
func (*StorageEndpoint) IncrementAssignmentCount(txn kv.Txn, id string, delta int) error {
34-
count, err := loadAssignmentCount(txn, id)
32+
// MetaServiceGroupStatus represents the status of a meta-service group.
33+
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
34+
type MetaServiceGroupStatus struct {
35+
AssignmentCount int `json:"assignment_count"`
36+
Enabled bool `json:"enabled"`
37+
}
38+
39+
// SaveMetaServiceGroupStatus saves the meta service group status to the storage.
40+
func (*StorageEndpoint) SaveMetaServiceGroupStatus(txn kv.Txn, id string, status *MetaServiceGroupStatus) error {
41+
statusPath := keypath.MetaServiceGroupStatusPath(id)
42+
statusVal, err := json.Marshal(status)
3543
if err != nil {
3644
return err
3745
}
38-
count += delta
39-
return saveAssignmentCount(txn, id, count)
46+
return txn.Save(statusPath, string(statusVal))
4047
}
4148

42-
// GetAssignmentCount returns the assignment count of the designated meta-service group.
43-
func (*StorageEndpoint) GetAssignmentCount(txn kv.Txn, ids map[string]string) (map[string]int, error) {
44-
counts := make(map[string]int)
49+
// LoadMetaServiceGroupStatus returns the status of the designated meta-service group.
50+
func (*StorageEndpoint) LoadMetaServiceGroupStatus(txn kv.Txn, ids map[string]string) (map[string]*MetaServiceGroupStatus, error) {
51+
statusMap := make(map[string]*MetaServiceGroupStatus)
4552
for id := range ids {
46-
count, err := loadAssignmentCount(txn, id)
53+
status, err := loadMetaServiceGroupStatus(txn, id)
4754
if err != nil {
4855
return nil, err
4956
}
50-
counts[id] = count
57+
statusMap[id] = status
5158
}
52-
return counts, nil
59+
return statusMap, nil
5360
}
5461

55-
func loadAssignmentCount(txn kv.Txn, id string) (int, error) {
56-
countPath := keypath.MetaServiceGroupAssignmentCountPath(id)
57-
countVal, err := txn.Load(countPath)
62+
func loadMetaServiceGroupStatus(txn kv.Txn, id string) (*MetaServiceGroupStatus, error) {
63+
statusPath := keypath.MetaServiceGroupStatusPath(id)
64+
statusVal, err := txn.Load(statusPath)
5865
if err != nil {
59-
return 0, err
66+
return nil, err
6067
}
61-
if countVal == "" {
62-
return 0, nil
68+
status := &MetaServiceGroupStatus{}
69+
if statusVal == "" {
70+
return status, nil
6371
}
64-
return strconv.Atoi(countVal)
65-
}
66-
67-
func saveAssignmentCount(txn kv.Txn, id string, count int) error {
68-
countPath := keypath.MetaServiceGroupAssignmentCountPath(id)
69-
countVal := strconv.Itoa(count)
70-
return txn.Save(countPath, countVal)
72+
if err = json.Unmarshal([]byte(statusVal), status); err != nil {
73+
return nil, err
74+
}
75+
return status, nil
7176
}

0 commit comments

Comments
 (0)