Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 55 additions & 22 deletions pkg/gc/gc_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,39 +664,66 @@ func (m *GCStateManager) GetGCState(keyspaceID uint32) (GCState, error) {
// must be fetched AFTER the beginning of the current invocation, and it never reuses the result of invocations that
// started earlier than the current one.
func (m *GCStateManager) GetAllKeyspacesGCStates(ctx context.Context) (map[uint32]GCState, error) {
return m.allKeyspacesGCStatesSingleFlight.Do(ctx, func() (map[uint32]GCState, error) {
result, err := m.getAllKeyspacesGCStatesImpl()
return m.allKeyspacesGCStatesSingleFlight.Do(ctx, func(execCtx context.Context) (map[uint32]GCState, error) {
result, err := m.getAllKeyspacesGCStatesImpl(execCtx)
failpoint.Inject("onGetAllKeyspacesGCStatesFinish", func() {})
return result, err
})
}

func (m *GCStateManager) getAllKeyspacesGCStatesImpl() (map[uint32]GCState, error) {
func (m *GCStateManager) getAllKeyspacesGCStatesImpl(ctx context.Context) (map[uint32]GCState, error) {
failpoint.InjectCall("onGetAllKeyspacesGCStatesStart")

// TODO: Handle the case that there are too many keyspaces and loading them at once is not suitable.
allKeyspaces, err := m.keyspaceManager.LoadRangeKeyspace(0, 0)
if err != nil {
return nil, err
mutexLocked := false
lock := func() {
m.mu.Lock()
mutexLocked = true
}
unlock := func() {
m.mu.Unlock()
mutexLocked = false
}
m.mu.Lock()
defer m.mu.Unlock()

ensureUnlocked := func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure the mutex is unlocked if the function is aborted abnormally (e.g., panicking), or more return path is introduced in the future.

if mutexLocked {
unlock()
}
}
defer ensureUnlocked()

keyspaceIterator := m.keyspaceManager.IterateKeyspaces(0)

// Do not guarantee atomicity among different keyspaces here.
results := make(map[uint32]GCState)
err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
lock()
err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
nullKeyspaceState, err1 := m.getGCStateInTransaction(constant.NullKeyspaceID, wb)
if err1 != nil {
return err1
}
results[constant.NullKeyspaceID] = nullKeyspaceState
return nil
})
unlock()
if err != nil {
return nil, err
}

for _, keyspaceMeta := range allKeyspaces {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

keyspaceMeta, ok, err := keyspaceIterator.Next()
if err != nil {
return nil, err
}
if !ok {
break
}

// Just handle the active keyspace, leave the others up to keyspace management.
if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
continue
Expand All @@ -710,6 +737,7 @@ func (m *GCStateManager) getAllKeyspacesGCStatesImpl() (map[uint32]GCState, erro
continue
}

lock()
err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
state, err1 := m.getGCStateInTransaction(keyspaceMeta.Id, wb)
if err1 != nil {
Expand All @@ -718,6 +746,7 @@ func (m *GCStateManager) getAllKeyspacesGCStatesImpl() (map[uint32]GCState, erro
results[keyspaceMeta.Id] = state
return nil
})
unlock()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -802,7 +831,7 @@ func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(keyspaceID uint32, s
var txnSafePoint uint64
err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
txnSafePoint, _, _, err1 = m.getAllKeyspacesMaxTxnSafePoint(wb)
txnSafePoint, _, _, err1 = m.getMaxTxnSafePointAmongAllKeyspaces(wb)
return err1
})
if err != nil {
Expand Down Expand Up @@ -902,16 +931,20 @@ func (m *GCStateManager) SetGlobalGCBarrier(ctx context.Context, barrierID strin
return m.setGlobalGCBarrierImpl(ctx, barrierID, barrierTS, ttl, now)
}

// getAllKeyspacesMaxTxnSafePoint must be called inside a transaction,
// getMaxTxnSafePointAmongAllKeyspaces must be called inside a transaction,
// The WriteBatch parameter in function signature is deliberate to the call safe, do not pass nil.
func (m *GCStateManager) getAllKeyspacesMaxTxnSafePoint(_ *endpoint.GCStateWriteBatch) (maxTxnSafePoint uint64, keyspaceName string, keyspaceID uint32, err error) {
// TODO: Handle the case that there are too many keyspaces and loading them at once is not suitable.
allKeyspaces, err1 := m.keyspaceManager.LoadRangeKeyspace(0, 0)
if err1 != nil {
err = err1
return
}
for _, keyspaceMeta := range allKeyspaces {
func (m *GCStateManager) getMaxTxnSafePointAmongAllKeyspaces(_ *endpoint.GCStateWriteBatch) (maxTxnSafePoint uint64, keyspaceName string, keyspaceID uint32, err error) {
keyspaceIterator := m.keyspaceManager.IterateKeyspaces(0)
for {
keyspaceMeta, ok, err2 := keyspaceIterator.Next()
if err2 != nil {
err = err2
return
}
if !ok {
break
}

if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
continue
}
Expand Down Expand Up @@ -949,7 +982,7 @@ func (m *GCStateManager) setGlobalGCBarrierImpl(_ context.Context, barrierID str
newBarrier := endpoint.NewGlobalGCBarrier(barrierID, barrierTS, expirationTime)
err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
// Make sure global barrier ts is ahead of txn safe point of all keyspaces.
maxTxnSafePoint, keyspaceName, keyspaceID, err := m.getAllKeyspacesMaxTxnSafePoint(wb)
maxTxnSafePoint, keyspaceName, keyspaceID, err := m.getMaxTxnSafePointAmongAllKeyspaces(wb)
if err != nil {
return err
}
Expand Down
134 changes: 122 additions & 12 deletions pkg/gc/gc_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/versioninfo/kerneltype"
"github.com/tikv/pd/server/config"
)

Expand Down Expand Up @@ -89,10 +90,24 @@ func TestGCStateManager(t *testing.T) {
type newGCStateManagerForTestOptions struct {
// Nil for generating initial keyspaces by the default preset
// Non-nil (including empty) for generating specified keyspaces
specifyInitialKeyspaces []*keyspace.CreateKeyspaceRequest
specifyInitialKeyspaces []*keyspace.CreateKeyspaceByIDRequest
etcdServerCfgModifier func(cfg *embed.Config)
}

func (opt *newGCStateManagerForTestOptions) generateKeyspacesByCount(count int) {
createTime := time.Now().Unix()
for i := range count {
id := new(uint32)
*id = uint32(i + 1)
opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceByIDRequest{
ID: id,
Name: fmt.Sprintf("ks%d", *id),
Config: map[string]string{keyspace.GCManagementType: keyspace.KeyspaceLevelGC},
CreateTime: createTime,
})
}
}

func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions) (storage *endpoint.StorageEndpoint, provider endpoint.GCStateProvider, gcStateManager *GCStateManager, clean func(), cancel context.CancelFunc) {
cfg := config.NewConfig()
re := require.New(t)
Expand Down Expand Up @@ -126,31 +141,40 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions)

// keyspaceID 0 exists automatically after bootstrapping.
if opt.specifyInitialKeyspaces == nil {
ks1, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
id := new(uint32)
*id = 1
ks1, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
ID: id,
Name: "ks1",
Config: map[string]string{"gc_management_type": "unified"},
CreateTime: time.Now().Unix(),
})
re.NoError(err)
re.Equal(uint32(1), ks1.Id)

ks2, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
*id = 2
ks2, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
ID: id,
Name: "ks2",
Config: map[string]string{"gc_management_type": "keyspace_level"},
CreateTime: time.Now().Unix(),
})
re.NoError(err)
re.Equal(uint32(2), ks2.Id)

ks3, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
*id = 3
ks3, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
ID: id,
Name: "ks3",
Config: map[string]string{},
CreateTime: time.Now().Unix(),
})
re.NoError(err)
re.Equal(uint32(3), ks3.Id)

ks4, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
*id = 4
ks4, err := keyspaceManager.CreateKeyspaceByID(&keyspace.CreateKeyspaceByIDRequest{
ID: id,
Name: "ks4",
Config: map[string]string{},
CreateTime: time.Now().Unix(),
Expand All @@ -161,7 +185,7 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions)
re.Equal(uint32(4), ks4.Id)
} else {
for _, req := range opt.specifyInitialKeyspaces {
_, err := keyspaceManager.CreateKeyspace(req)
_, err := keyspaceManager.CreateKeyspaceByID(req)
re.NoError(err)
}
}
Expand Down Expand Up @@ -1839,7 +1863,7 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesMaxTxnSafePoint() {
var keyspaceID uint32
err := s.provider.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getAllKeyspacesMaxTxnSafePoint(wb)
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getMaxTxnSafePointAmongAllKeyspaces(wb)
return err1
})
re.NoError(err)
Expand All @@ -1854,7 +1878,7 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesMaxTxnSafePoint() {
}
err = s.provider.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getAllKeyspacesMaxTxnSafePoint(wb)
txnSafePoint, keyspaceName, keyspaceID, err1 = s.manager.getMaxTxnSafePointAmongAllKeyspaces(wb)
return err1
})
re.NoError(err)
Expand Down Expand Up @@ -2030,22 +2054,108 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesGCStatesConcurrentCallShari
re.Equal(int64(2), executionCount.Load())
}

func TestGetAllKeysapcesGCStatesOnTooManyKeyspaces(t *testing.T) {
re := require.New(t)

const totalKeyspaces = keyspace.IteratorLoadingBatchSize * 3

opt := newGCStateManagerForTestOptions{
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, totalKeyspaces),
}
opt.generateKeyspacesByCount(totalKeyspaces)

_, _, gcStateManager, clean, cancel := newGCStateManagerForTest(t, opt)
defer func() {
cancel()
clean()
}()

gcStates, err := gcStateManager.GetAllKeyspacesGCStates(context.Background())
re.Len(gcStates, totalKeyspaces+2) // Including the null keyspace, the default keyspace or the system keyspace.

re.NoError(err)
keyspaceIDs := make([]uint32, 0, len(gcStates))
for keyspaceID, gcState := range gcStates {
re.Equal(keyspaceID, gcState.KeyspaceID)
keyspaceIDs = append(keyspaceIDs, keyspaceID)
}
slices.Sort(keyspaceIDs)

expectedKeyspaceIDs := make([]uint32, 0, len(gcStates))
if !kerneltype.IsNextGen() {
expectedKeyspaceIDs = append(expectedKeyspaceIDs, constant.DefaultKeyspaceID)
}
for i := range totalKeyspaces {
expectedKeyspaceIDs = append(expectedKeyspaceIDs, uint32(i+1))
}
if kerneltype.IsNextGen() {
expectedKeyspaceIDs = append(expectedKeyspaceIDs, constant.SystemKeyspaceID)
}
expectedKeyspaceIDs = append(expectedKeyspaceIDs, constant.NullKeyspaceID)
re.Equal(expectedKeyspaceIDs, keyspaceIDs)
}

func TestGetMaxTxnSafePointAmongAllKeyspacesOnTooManyKeyspaces(t *testing.T) {
re := require.New(t)

const totalKeyspaces = keyspace.IteratorLoadingBatchSize * 2

opt := newGCStateManagerForTestOptions{
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, totalKeyspaces),
}
opt.generateKeyspacesByCount(totalKeyspaces)

_, _, gcStateManager, clean, cancel := newGCStateManagerForTest(t, opt)
defer func() {
cancel()
clean()
}()

now := time.Now()
// Test around the boundary of two loading batches, so that it's likely to detect incorrectness when loading
// multiple batches.
for i := keyspace.IteratorLoadingBatchSize - 5; i <= keyspace.IteratorLoadingBatchSize+5; i++ {
keyspaceID := uint32(i)
newTxnSafePoint := uint64(i)
res, err := gcStateManager.AdvanceTxnSafePoint(keyspaceID, newTxnSafePoint, now)
re.NoError(err)
re.Equal(newTxnSafePoint, res.NewTxnSafePoint)

var maxTxnSafePoint uint64
var keyspaceIDWithMaxTxnSafePoint uint32
var keyspaceNameWithMaxTxnSafePoint string
err = gcStateManager.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
maxTxnSafePoint, keyspaceNameWithMaxTxnSafePoint, keyspaceIDWithMaxTxnSafePoint, err1 = gcStateManager.getMaxTxnSafePointAmongAllKeyspaces(wb)
return err1
})
re.NoError(err)
re.Equal(newTxnSafePoint, maxTxnSafePoint)
re.Equal(keyspaceID, keyspaceIDWithMaxTxnSafePoint)
re.Equal(fmt.Sprintf("ks%d", keyspaceID), keyspaceNameWithMaxTxnSafePoint)
}
}

func benchmarkGetAllKeyspacesGCStatesImpl(b *testing.B, keyspacesCount int, parallelism int) {
re := require.New(b)
fname := testutil.InitTempFileLogger("info")
defer os.Remove(fname)

opt := newGCStateManagerForTestOptions{
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceRequest, 0, keyspacesCount),
specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, keyspacesCount),
etcdServerCfgModifier: func(cfg *embed.Config) {
cfg.LogOutputs = []string{fname}
},
}
createTime := time.Now().Unix()
for i := range keyspacesCount {
opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceRequest{
Name: fmt.Sprintf("ks%d", i),
id := new(uint32)
*id = uint32(i + 1)
opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceByIDRequest{
ID: id,
Name: fmt.Sprintf("ks%d", *id),
Config: map[string]string{keyspace.GCManagementType: keyspace.KeyspaceLevelGC},
CreateTime: time.Now().Unix(),
CreateTime: createTime,
})
}

Expand Down
Loading
Loading