Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,21 +1074,15 @@ func (c *RaftCluster) GetRangeHoles() [][]string {
}

// UpdateStoreLabels updates a store's location labels
// If 'force' is true, then update the store's labels forcibly.
// If 'force' is true, the origin labels will be overwritten with the new one forcibly.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel, force bool) error {
store := c.GetStore(storeID)
if store == nil {
return errs.ErrInvalidStoreID.FastGenByArgs(storeID)
}
newStore := typeutil.DeepClone(store.GetMeta(), core.StoreFactory)
if force {
newStore.Labels = labels
} else {
// If 'force' isn't set, the given labels will merge into those labels which already existed in the store.
newStore.Labels = core.MergeLabels(newStore.GetLabels(), labels)
}
// PutStore will perform label merge.
return c.putStoreImpl(newStore)
newStore.Labels = labels
return c.putStoreImpl(newStore, force)
}

// DeleteStoreLabel updates a store's location labels
Expand All @@ -1109,13 +1103,12 @@ func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error {
return errors.Errorf("the label key %s does not exist", labelKey)
}
newStore.Labels = labels
// PutStore will perform label merge.
return c.putStoreImpl(newStore)
return c.putStoreImpl(newStore, true)
}

// PutStore puts a store.
func (c *RaftCluster) PutStore(store *metapb.Store) error {
if err := c.putStoreImpl(store); err != nil {
if err := c.putStoreImpl(store, false); err != nil {
return err
}
c.OnStoreVersionChange()
Expand All @@ -1124,8 +1117,9 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error {
}

// putStoreImpl puts a store.
// If 'force' is true, then overwrite the store's labels.
func (c *RaftCluster) putStoreImpl(store *metapb.Store) error {
// If 'force' is true, the store's labels will overwrite those labels which already existed in the store.
// If 'force' is false, the store's labels will merge into those labels which already existed in the store.
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -1155,6 +1149,9 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store) error {
} else {
// Use the given labels to update the store.
labels := store.GetLabels()
if !force {
labels = core.MergeLabels(s.GetLabels(), labels)
}
// Update an existed store.
s = s.Clone(
core.SetStoreAddress(store.Address, store.StatusAddress, store.PeerAddress),
Expand Down
109 changes: 109 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/id"
Expand Down Expand Up @@ -1800,6 +1801,114 @@ func TestAwakenStore(t *testing.T) {
re.True(store1.NeedAwakenStore())
}

func TestUpdateAndDeleteLabel(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
stores := newTestStores(1, "6.5.1")
for _, store := range stores {
re.NoError(cluster.PutStore(store.GetMeta()))
}
re.Empty(cluster.GetStore(1).GetLabels())
// Update label.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
false,
)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Update label again.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{
{Key: "mode", Value: "readonly"},
},
false,
)
// Update label with empty value.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{},
false,
)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
{Key: "mode", Value: "readonly"},
},
cluster.GetStore(1).GetLabels(),
)
// Delete label.
err = cluster.DeleteStoreLabel(1, "mode")
re.NoError(err)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Delete a non-exist label.
err = cluster.DeleteStoreLabel(1, "mode")
re.Error(err)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Update label without force.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{},
false,
)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Update label with force.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{},
true,
)
re.Empty(cluster.GetStore(1).GetLabels())
// Update label first and then reboot the store.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}},
false,
)
re.Equal([]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}}, cluster.GetStore(1).GetLabels())
// Mock the store doesn't have any label configured.
newStore := typeutil.DeepClone(cluster.GetStore(1).GetMeta(), core.StoreFactory)
newStore.Labels = nil
// Store rebooting will call PutStore.
err = cluster.PutStore(newStore)
re.NoError(err)
// Check the label after rebooting.
re.Equal([]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}}, cluster.GetStore(1).GetLabels())
}

type testCluster struct {
*RaftCluster
}
Expand Down