Skip to content

Commit 7cbc382

Browse files
authored
gc: Optimize GetAllKeyspacesGCStates for scenarios where there are too many keyspaces (#9777)
ref #8978 This PR supports loading keyspaces by smaller batches in `GetAllKeyspacesGCStates`. This avoids it througing errors due to etcd txn size limit when there are too many keyspaces. Also makes `GetAllKeyspacesGCStates` support cancelling. This is not the ultimate solution for tracking GC states changes. The best way is to implement a streaming API that watches the changes of the GC states. However, this PR provides a simpler fix to make it able to run. Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
1 parent a27c2e8 commit 7cbc382

File tree

6 files changed

+772
-109
lines changed

6 files changed

+772
-109
lines changed

pkg/gc/gc_state_manager.go

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -664,39 +664,66 @@ func (m *GCStateManager) GetGCState(keyspaceID uint32) (GCState, error) {
664664
// must be fetched AFTER the beginning of the current invocation, and it never reuses the result of invocations that
665665
// started earlier than the current one.
666666
func (m *GCStateManager) GetAllKeyspacesGCStates(ctx context.Context) (map[uint32]GCState, error) {
667-
return m.allKeyspacesGCStatesSingleFlight.Do(ctx, func() (map[uint32]GCState, error) {
668-
result, err := m.getAllKeyspacesGCStatesImpl()
667+
return m.allKeyspacesGCStatesSingleFlight.Do(ctx, func(execCtx context.Context) (map[uint32]GCState, error) {
668+
result, err := m.getAllKeyspacesGCStatesImpl(execCtx)
669669
failpoint.Inject("onGetAllKeyspacesGCStatesFinish", func() {})
670670
return result, err
671671
})
672672
}
673673

674-
func (m *GCStateManager) getAllKeyspacesGCStatesImpl() (map[uint32]GCState, error) {
674+
func (m *GCStateManager) getAllKeyspacesGCStatesImpl(ctx context.Context) (map[uint32]GCState, error) {
675675
failpoint.InjectCall("onGetAllKeyspacesGCStatesStart")
676676

677-
// TODO: Handle the case that there are too many keyspaces and loading them at once is not suitable.
678-
allKeyspaces, err := m.keyspaceManager.LoadRangeKeyspace(0, 0)
679-
if err != nil {
680-
return nil, err
677+
mutexLocked := false
678+
lock := func() {
679+
m.mu.Lock()
680+
mutexLocked = true
681+
}
682+
unlock := func() {
683+
m.mu.Unlock()
684+
mutexLocked = false
681685
}
682-
m.mu.Lock()
683-
defer m.mu.Unlock()
686+
687+
ensureUnlocked := func() {
688+
if mutexLocked {
689+
unlock()
690+
}
691+
}
692+
defer ensureUnlocked()
693+
694+
keyspaceIterator := m.keyspaceManager.IterateKeyspaces()
684695

685696
// Do not guarantee atomicity among different keyspaces here.
686697
results := make(map[uint32]GCState)
687-
err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
698+
lock()
699+
err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
688700
nullKeyspaceState, err1 := m.getGCStateInTransaction(constant.NullKeyspaceID, wb)
689701
if err1 != nil {
690702
return err1
691703
}
692704
results[constant.NullKeyspaceID] = nullKeyspaceState
693705
return nil
694706
})
707+
unlock()
695708
if err != nil {
696709
return nil, err
697710
}
698711

699-
for _, keyspaceMeta := range allKeyspaces {
712+
for {
713+
select {
714+
case <-ctx.Done():
715+
return nil, ctx.Err()
716+
default:
717+
}
718+
719+
keyspaceMeta, ok, err := keyspaceIterator.Next()
720+
if err != nil {
721+
return nil, err
722+
}
723+
if !ok {
724+
break
725+
}
726+
700727
// Just handle the active keyspace, leave the others up to keyspace management.
701728
if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
702729
continue
@@ -710,6 +737,7 @@ func (m *GCStateManager) getAllKeyspacesGCStatesImpl() (map[uint32]GCState, erro
710737
continue
711738
}
712739

740+
lock()
713741
err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
714742
state, err1 := m.getGCStateInTransaction(keyspaceMeta.Id, wb)
715743
if err1 != nil {
@@ -718,6 +746,7 @@ func (m *GCStateManager) getAllKeyspacesGCStatesImpl() (map[uint32]GCState, erro
718746
results[keyspaceMeta.Id] = state
719747
return nil
720748
})
749+
unlock()
721750
if err != nil {
722751
return nil, err
723752
}
@@ -810,7 +839,7 @@ func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(keyspaceID uint32, s
810839
var txnSafePoint uint64
811840
err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
812841
var err1 error
813-
txnSafePoint, _, _, err1 = m.getAllKeyspacesMaxTxnSafePoint(wb)
842+
txnSafePoint, _, _, err1 = m.getMaxTxnSafePointAmongAllKeyspaces(wb)
814843
return err1
815844
})
816845
if err != nil {
@@ -910,16 +939,20 @@ func (m *GCStateManager) SetGlobalGCBarrier(ctx context.Context, barrierID strin
910939
return m.setGlobalGCBarrierImpl(ctx, barrierID, barrierTS, ttl, now)
911940
}
912941

913-
// getAllKeyspacesMaxTxnSafePoint must be called inside a transaction,
942+
// getMaxTxnSafePointAmongAllKeyspaces must be called inside a transaction,
914943
// The WriteBatch parameter in function signature is deliberate to the call safe, do not pass nil.
915-
func (m *GCStateManager) getAllKeyspacesMaxTxnSafePoint(_ *endpoint.GCStateWriteBatch) (maxTxnSafePoint uint64, keyspaceName string, keyspaceID uint32, err error) {
916-
// TODO: Handle the case that there are too many keyspaces and loading them at once is not suitable.
917-
allKeyspaces, err1 := m.keyspaceManager.LoadRangeKeyspace(0, 0)
918-
if err1 != nil {
919-
err = err1
920-
return
921-
}
922-
for _, keyspaceMeta := range allKeyspaces {
944+
func (m *GCStateManager) getMaxTxnSafePointAmongAllKeyspaces(_ *endpoint.GCStateWriteBatch) (maxTxnSafePoint uint64, keyspaceName string, keyspaceID uint32, err error) {
945+
keyspaceIterator := m.keyspaceManager.IterateKeyspaces()
946+
for {
947+
keyspaceMeta, ok, err2 := keyspaceIterator.Next()
948+
if err2 != nil {
949+
err = err2
950+
return
951+
}
952+
if !ok {
953+
break
954+
}
955+
923956
if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
924957
continue
925958
}
@@ -957,7 +990,7 @@ func (m *GCStateManager) setGlobalGCBarrierImpl(_ context.Context, barrierID str
957990
newBarrier := endpoint.NewGlobalGCBarrier(barrierID, barrierTS, expirationTime)
958991
err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
959992
// Make sure global barrier ts is ahead of txn safe point of all keyspaces.
960-
maxTxnSafePoint, keyspaceName, keyspaceID, err := m.getAllKeyspacesMaxTxnSafePoint(wb)
993+
maxTxnSafePoint, keyspaceName, keyspaceID, err := m.getMaxTxnSafePointAmongAllKeyspaces(wb)
961994
if err != nil {
962995
return err
963996
}

pkg/gc/gc_state_manager_test.go

Lines changed: 122 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/tikv/pd/pkg/utils/etcdutil"
4747
"github.com/tikv/pd/pkg/utils/keypath"
4848
"github.com/tikv/pd/pkg/utils/testutil"
49+
"github.com/tikv/pd/pkg/versioninfo/kerneltype"
4950
"github.com/tikv/pd/server/config"
5051
)
5152

@@ -89,10 +90,24 @@ func TestGCStateManager(t *testing.T) {
8990
type newGCStateManagerForTestOptions struct {
9091
// Nil for generating initial keyspaces by the default preset
9192
// Non-nil (including empty) for generating specified keyspaces
92-
specifyInitialKeyspaces []*keyspace.CreateKeyspaceRequest
93+
specifyInitialKeyspaces []*keyspace.CreateKeyspaceByIDRequest
9394
etcdServerCfgModifier func(cfg *embed.Config)
9495
}
9596

97+
func (opt *newGCStateManagerForTestOptions) generateKeyspacesByCount(count int) {
98+
createTime := time.Now().Unix()
99+
for i := range count {
100+
id := new(uint32)
101+
*id = uint32(i + 1)
102+
opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceByIDRequest{
103+
ID: id,
104+
Name: fmt.Sprintf("ks%d", *id),
105+
Config: map[string]string{keyspace.GCManagementType: keyspace.KeyspaceLevelGC},
106+
CreateTime: createTime,
107+
})
108+
}
109+
}
110+
96111
func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions) (storage *endpoint.StorageEndpoint, provider endpoint.GCStateProvider, gcStateManager *GCStateManager, clean func(), cancel context.CancelFunc) {
97112
cfg := config.NewConfig()
98113
re := require.New(t)
@@ -126,31 +141,40 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions)
126141

127142
// keyspaceID 0 exists automatically after bootstrapping.
128143
if opt.specifyInitialKeyspaces == nil {
129-
ks1, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
144+
id := new(uint32)
145+
*id = 1
146+
ks1, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
147+
ID: id,
130148
Name: "ks1",
131149
Config: map[string]string{"gc_management_type": "unified"},
132150
CreateTime: time.Now().Unix(),
133151
})
134152
re.NoError(err)
135153
re.Equal(uint32(1), ks1.Id)
136154

137-
ks2, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
155+
*id = 2
156+
ks2, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
157+
ID: id,
138158
Name: "ks2",
139159
Config: map[string]string{"gc_management_type": "keyspace_level"},
140160
CreateTime: time.Now().Unix(),
141161
})
142162
re.NoError(err)
143163
re.Equal(uint32(2), ks2.Id)
144164

145-
ks3, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
165+
*id = 3
166+
ks3, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
167+
ID: id,
146168
Name: "ks3",
147169
Config: map[string]string{},
148170
CreateTime: time.Now().Unix(),
149171
})
150172
re.NoError(err)
151173
re.Equal(uint32(3), ks3.Id)
152174

153-
ks4, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
175+
*id = 4
176+
ks4, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
177+
ID: id,
154178
Name: "ks4",
155179
Config: map[string]string{},
156180
CreateTime: time.Now().Unix(),
@@ -161,7 +185,7 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions)
161185
re.Equal(uint32(4), ks4.Id)
162186
} else {
163187
for _, req := range opt.specifyInitialKeyspaces {
164-
_, err := keyspaceManager.CreateKeyspace(req)
188+
_, err := keyspaceManager.CreateKeyspaceByID(req)
165189
re.NoError(err)
166190
}
167191
}
@@ -1839,7 +1863,7 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesMaxTxnSafePoint() {
18391863
var keyspaceID uint32
18401864
err := s.provider.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
18411865
var err1 error
1842-
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getAllKeyspacesMaxTxnSafePoint(wb)
1866+
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getMaxTxnSafePointAmongAllKeyspaces(wb)
18431867
return err1
18441868
})
18451869
re.NoError(err)
@@ -1854,7 +1878,7 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesMaxTxnSafePoint() {
18541878
}
18551879
err = s.provider.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
18561880
var err1 error
1857-
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getAllKeyspacesMaxTxnSafePoint(wb)
1881+
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getMaxTxnSafePointAmongAllKeyspaces(wb)
18581882
return err1
18591883
})
18601884
re.NoError(err)
@@ -2030,22 +2054,108 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesGCStatesConcurrentCallShari
20302054
re.Equal(int64(2), executionCount.Load())
20312055
}
20322056

2057+
func TestGetAllKeysapcesGCStatesOnTooManyKeyspaces(t *testing.T) {
2058+
re := require.New(t)
2059+
2060+
const totalKeyspaces = keyspace.IteratorLoadingBatchSize * 3
2061+
2062+
opt := newGCStateManagerForTestOptions{
2063+
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, totalKeyspaces),
2064+
}
2065+
opt.generateKeyspacesByCount(totalKeyspaces)
2066+
2067+
_, _, gcStateManager, clean, cancel := newGCStateManagerForTest(t, opt)
2068+
defer func() {
2069+
cancel()
2070+
clean()
2071+
}()
2072+
2073+
gcStates, err := gcStateManager.GetAllKeyspacesGCStates(context.Background())
2074+
re.Len(gcStates, totalKeyspaces+2) // Including the null keyspace, the default keyspace or the system keyspace.
2075+
2076+
re.NoError(err)
2077+
keyspaceIDs := make([]uint32, 0, len(gcStates))
2078+
for keyspaceID, gcState := range gcStates {
2079+
re.Equal(keyspaceID, gcState.KeyspaceID)
2080+
keyspaceIDs = append(keyspaceIDs, keyspaceID)
2081+
}
2082+
slices.Sort(keyspaceIDs)
2083+
2084+
expectedKeyspaceIDs := make([]uint32, 0, len(gcStates))
2085+
if !kerneltype.IsNextGen() {
2086+
expectedKeyspaceIDs = append(expectedKeyspaceIDs, constant.DefaultKeyspaceID)
2087+
}
2088+
for i := range totalKeyspaces {
2089+
expectedKeyspaceIDs = append(expectedKeyspaceIDs, uint32(i+1))
2090+
}
2091+
if kerneltype.IsNextGen() {
2092+
expectedKeyspaceIDs = append(expectedKeyspaceIDs, constant.SystemKeyspaceID)
2093+
}
2094+
expectedKeyspaceIDs = append(expectedKeyspaceIDs, constant.NullKeyspaceID)
2095+
re.Equal(expectedKeyspaceIDs, keyspaceIDs)
2096+
}
2097+
2098+
func TestGetMaxTxnSafePointAmongAllKeyspacesOnTooManyKeyspaces(t *testing.T) {
2099+
re := require.New(t)
2100+
2101+
const totalKeyspaces = keyspace.IteratorLoadingBatchSize * 2
2102+
2103+
opt := newGCStateManagerForTestOptions{
2104+
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, totalKeyspaces),
2105+
}
2106+
opt.generateKeyspacesByCount(totalKeyspaces)
2107+
2108+
_, _, gcStateManager, clean, cancel := newGCStateManagerForTest(t, opt)
2109+
defer func() {
2110+
cancel()
2111+
clean()
2112+
}()
2113+
2114+
now := time.Now()
2115+
// Test around the boundary of two loading batches, so that it's likely to detect incorrectness when loading
2116+
// multiple batches.
2117+
for i := keyspace.IteratorLoadingBatchSize - 5; i <= keyspace.IteratorLoadingBatchSize+5; i++ {
2118+
keyspaceID := uint32(i)
2119+
newTxnSafePoint := uint64(i)
2120+
res, err := gcStateManager.AdvanceTxnSafePoint(keyspaceID, newTxnSafePoint, now)
2121+
re.NoError(err)
2122+
re.Equal(newTxnSafePoint, res.NewTxnSafePoint)
2123+
2124+
var maxTxnSafePoint uint64
2125+
var keyspaceIDWithMaxTxnSafePoint uint32
2126+
var keyspaceNameWithMaxTxnSafePoint string
2127+
err = gcStateManager.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
2128+
var err1 error
2129+
maxTxnSafePoint, keyspaceNameWithMaxTxnSafePoint, keyspaceIDWithMaxTxnSafePoint, err1 = gcStateManager.getMaxTxnSafePointAmongAllKeyspaces(wb)
2130+
return err1
2131+
})
2132+
re.NoError(err)
2133+
re.Equal(newTxnSafePoint, maxTxnSafePoint)
2134+
re.Equal(keyspaceID, keyspaceIDWithMaxTxnSafePoint)
2135+
re.Equal(fmt.Sprintf("ks%d", keyspaceID), keyspaceNameWithMaxTxnSafePoint)
2136+
}
2137+
}
2138+
20332139
func benchmarkGetAllKeyspacesGCStatesImpl(b *testing.B, keyspacesCount int, parallelism int) {
20342140
re := require.New(b)
20352141
fname := testutil.InitTempFileLogger("info")
20362142
defer os.Remove(fname)
20372143

20382144
opt := newGCStateManagerForTestOptions{
2039-
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceRequest, 0, keyspacesCount),
2145+
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, keyspacesCount),
20402146
etcdServerCfgModifier: func(cfg *embed.Config) {
20412147
cfg.LogOutputs = []string{fname}
20422148
},
20432149
}
2150+
createTime := time.Now().Unix()
20442151
for i := range keyspacesCount {
2045-
opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceRequest{
2046-
Name: fmt.Sprintf("ks%d", i),
2152+
id := new(uint32)
2153+
*id = uint32(i + 1)
2154+
opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceByIDRequest{
2155+
ID: id,
2156+
Name: fmt.Sprintf("ks%d", *id),
20472157
Config: map[string]string{keyspace.GCManagementType: keyspace.KeyspaceLevelGC},
2048-
CreateTime: time.Now().Unix(),
2158+
CreateTime: createTime,
20492159
})
20502160
}
20512161

0 commit comments

Comments
 (0)