Skip to content

Commit 6326f2d

Browse files
authored
cp fix global service safe point (tikv#312) (tikv#370)
Signed-off-by: ystaticy <y_static_y@sina.com>
1 parent de0cb53 commit 6326f2d

File tree

5 files changed

+129
-13
lines changed

5 files changed

+129
-13
lines changed

pkg/gc/safepoint_v2.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,13 @@ func (manager *SafePointV2Manager) RemoveServiceSafePoint(keyspaceID uint32, ser
212212
}
213213
return minServiceSafePoint, nil
214214
}
215+
216+
// LoadMinServiceSafePointV2 update keyspace service safe point with the given serviceSafePoint.
217+
func (manager *SafePointV2Manager) LoadMinServiceSafePointV2(serviceSafePoint *endpoint.ServiceSafePointV2, now time.Time) (*endpoint.ServiceSafePointV2, error) {
218+
return manager.v2Storage.LoadMinServiceSafePointV2(serviceSafePoint.KeyspaceID, now)
219+
}
220+
221+
// LoadServiceSafePointV2 update keyspace service safe point with the given serviceSafePoint.
222+
func (manager *SafePointV2Manager) LoadServiceSafePointV2(keyspaceID uint32, serviceID string) (*endpoint.ServiceSafePointV2, error) {
223+
return manager.v2Storage.LoadServiceSafePointV2(keyspaceID, serviceID)
224+
}

pkg/storage/endpoint/safepoint_v2.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ import (
2727
"go.uber.org/zap"
2828
)
2929

30+
const (
31+
// NullKeyspaceID is used for api v1 or legacy path where is keyspace agnostic.
32+
nullKeyspaceID = uint32(0xFFFFFFFF)
33+
)
34+
3035
// GCSafePointV2 represents the overall safe point for a specific keyspace.
3136
type GCSafePointV2 struct {
3237
KeyspaceID uint32 `json:"keyspace_id"`
@@ -118,6 +123,17 @@ func (se *StorageEndpoint) LoadMinServiceSafePointV2(keyspaceID uint32, now time
118123

119124
hasGCWorker := false
120125
min := &ServiceSafePointV2{KeyspaceID: keyspaceID, SafePoint: math.MaxUint64}
126+
127+
// Load global service safe point
128+
serviceSafePointV1, err := se.loadServiceGCSafePointV1(keypath.NativeBRServiceID)
129+
if err != nil {
130+
return nil, err
131+
}
132+
if serviceSafePointV1 != nil {
133+
min.KeyspaceID = nullKeyspaceID
134+
min.SafePoint = serviceSafePointV1.SafePoint
135+
}
136+
121137
for i, key := range keys {
122138
serviceSafePoint := &ServiceSafePointV2{}
123139
if err = json.Unmarshal([]byte(values[i]), serviceSafePoint); err != nil {
@@ -157,6 +173,22 @@ func (se *StorageEndpoint) LoadMinServiceSafePointV2(keyspaceID uint32, now time
157173
return min, nil
158174
}
159175

176+
// loadServiceGCSafePointV1 loads current GC safe point from storage.
177+
func (se *StorageEndpoint) loadServiceGCSafePointV1(serviceID string) (*ServiceSafePoint, error) {
178+
serviceIDPath := keypath.GCSafePointServicePrefixPath() + serviceID
179+
value, err := se.Load(serviceIDPath)
180+
if err != nil || value == "" {
181+
return nil, err
182+
}
183+
184+
ssp := &ServiceSafePoint{}
185+
if err := json.Unmarshal([]byte(value), ssp); err != nil {
186+
return nil, err
187+
}
188+
189+
return ssp, nil
190+
}
191+
160192
// LoadServiceSafePointV2 returns ServiceSafePointV2 for given keyspaceID and serviceID.
161193
func (se *StorageEndpoint) LoadServiceSafePointV2(keyspaceID uint32, serviceID string) (*ServiceSafePointV2, error) {
162194
key := keypath.ServiceSafePointV2Path(keyspaceID, serviceID)

pkg/storage/storage_gc_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,45 @@ func TestLoadEmpty(t *testing.T) {
150150
re.NoError(err)
151151
re.Nil(serviceSafePoint)
152152
}
153+
154+
func TestGlobalServiceSafePoint(t *testing.T) {
155+
// expectMinServiceSafePoint is a very small value,
156+
// we set NativeBRServiceID's service safe point to expectMinServiceSafePoint,
157+
// and check min service safe point V2 is expectMinServiceSafePoint.
158+
expectMinServiceSafePoint := uint64(50)
159+
160+
re := require.New(t)
161+
storage := NewStorageWithMemoryBackend()
162+
currentTime := time.Now()
163+
expireAt1 := currentTime.Add(1000 * time.Second).Unix()
164+
expireAt2 := currentTime.Add(2000 * time.Second).Unix()
165+
expireAt3 := currentTime.Add(3000 * time.Second).Unix()
166+
167+
testKeyspaceID := uint32(1)
168+
serviceSafePoints := []*endpoint.ServiceSafePointV2{
169+
{KeyspaceID: testKeyspaceID, ServiceID: "0", ExpiredAt: expireAt1, SafePoint: 300},
170+
{KeyspaceID: testKeyspaceID, ServiceID: "1", ExpiredAt: expireAt2, SafePoint: 400},
171+
{KeyspaceID: testKeyspaceID, ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 500},
172+
}
173+
174+
globalServiceIDs := []string{keypath.NativeBRServiceID, "test01"}
175+
minGlobalSafePoints := []uint64{expectMinServiceSafePoint, expectMinServiceSafePoint + 1}
176+
177+
for i, globalServiceID := range globalServiceIDs {
178+
ssp := &endpoint.ServiceSafePoint{
179+
ServiceID: globalServiceID,
180+
ExpiredAt: expireAt3,
181+
SafePoint: minGlobalSafePoints[i],
182+
}
183+
storage.SaveServiceGCSafePoint(ssp)
184+
}
185+
186+
for _, serviceSafePoint := range serviceSafePoints {
187+
re.NoError(storage.SaveServiceSafePointV2(serviceSafePoint))
188+
}
189+
// enabling failpoint to make expired key removal immediately observable
190+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys", "return(true)"))
191+
minSafePoint, err := storage.LoadMinServiceSafePointV2(testKeyspaceID, currentTime)
192+
re.NoError(err)
193+
re.Equal(expectMinServiceSafePoint, minSafePoint.SafePoint)
194+
}

pkg/utils/keypath/key_path.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,17 @@ const (
4646
CustomSchedulerConfigPath = "scheduler_config"
4747
// GCWorkerServiceSafePointID is the service id of GC worker.
4848
GCWorkerServiceSafePointID = "gc_worker"
49-
// NativeBRServiceID is CSE native br service id.
50-
NativeBRServiceID = "native_br"
51-
minResolvedTS = "min_resolved_ts"
52-
externalTimeStamp = "external_timestamp"
53-
keyspaceSafePointPrefix = "keyspaces/gc_safepoint"
54-
keyspaceGCSafePointSuffix = "gc"
55-
keyspacePrefix = "keyspaces"
56-
keyspaceMetaInfix = "meta"
57-
keyspaceIDInfix = "id"
58-
keyspaceAllocID = "alloc_id"
59-
gcSafePointInfix = "gc_safe_point"
60-
serviceSafePointInfix = "service_safe_point"
61-
regionPathPrefix = "raft/r"
49+
minResolvedTS = "min_resolved_ts"
50+
externalTimeStamp = "external_timestamp"
51+
keyspaceSafePointPrefix = "keyspaces/gc_safepoint"
52+
keyspaceGCSafePointSuffix = "gc"
53+
keyspacePrefix = "keyspaces"
54+
keyspaceMetaInfix = "meta"
55+
keyspaceIDInfix = "id"
56+
keyspaceAllocID = "alloc_id"
57+
gcSafePointInfix = "gc_safe_point"
58+
serviceSafePointInfix = "service_safe_point"
59+
regionPathPrefix = "raft/r"
6260
// resource group storage endpoint has prefix `resource_group`
6361
// ResourceGroupSettingsPath is the path to save the resource group settings.
6462
ResourceGroupSettingsPath = "settings"
@@ -85,6 +83,9 @@ const (
8583
// Keyspace safe point version.
8684
keyspaceGlobalInfix = "global"
8785
keyspaceGlobalSafePointVersionInfix = "safe_point_version"
86+
87+
// NativeBRServiceID is CSE native br service id.
88+
NativeBRServiceID = "native_br"
8889
)
8990

9091
// PDRootPath returns the PD root path.

tests/integrations/client/gc_client_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,34 @@ func (suite *gcClientTestSuite) mustGetRevision(re *require.Assertions, keyspace
218218
re.NoError(err)
219219
return res.Header.GetRevision()
220220
}
221+
222+
// mustUpdateServiceSafePoint updates the service safe point of the given keyspace id.
223+
func (suite *gcClientTestSuite) mustUpdateServiceSafePoint(keyspaceID uint32, safePoint uint64, ttl int64, serviceID string) uint64 {
224+
res, err := suite.client.UpdateServiceSafePointV2(suite.server.Context(), keyspaceID, serviceID, ttl, safePoint)
225+
suite.NoError(err)
226+
return res
227+
}
228+
229+
// mustUpdateServiceSafePoint updates the service safe point of the given keyspace id.
230+
func (suite *gcClientTestSuite) mustLoadServiceSafePoint(keyspaceID uint32, serviceID string) uint64 {
231+
res, err := suite.server.GetSafePointV2Manager().LoadServiceSafePointV2(keyspaceID, serviceID)
232+
suite.NoError(err)
233+
return res.SafePoint
234+
}
235+
236+
func (suite *gcClientTestSuite) TestUpdateServiceSafePointV2() {
237+
serviceID := "test_service_id"
238+
ttl := int64(10000)
239+
240+
keyspaceID01 := uint32(1)
241+
serviceSafePoint01 := uint64(1)
242+
_ = suite.mustUpdateServiceSafePoint(keyspaceID01, serviceSafePoint01, ttl, serviceID)
243+
minSafePoint01 := suite.mustLoadServiceSafePoint(keyspaceID01, serviceID)
244+
suite.Equal(serviceSafePoint01, minSafePoint01)
245+
246+
keyspaceID02 := uint32(2)
247+
serviceSafePoint02 := uint64(2)
248+
_ = suite.mustUpdateServiceSafePoint(keyspaceID02, serviceSafePoint02, ttl, serviceID)
249+
minSafePoint02 := suite.mustLoadServiceSafePoint(keyspaceID02, serviceID)
250+
suite.Equal(serviceSafePoint02, minSafePoint02)
251+
}

0 commit comments

Comments
 (0)