Skip to content

Commit 67aa227

Browse files
committed
grpc impl
Signed-off-by: 童剑 <1045931706@qq.com>
1 parent 43650b7 commit 67aa227

File tree

4 files changed

+642
-246
lines changed

4 files changed

+642
-246
lines changed

pkg/grpc/helper.go

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc
16+
17+
import (
18+
"bytes"
19+
"fmt"
20+
21+
"go.uber.org/multierr"
22+
"go.uber.org/zap"
23+
"time"
24+
25+
"github.com/pingcap/kvproto/pkg/metapb"
26+
"github.com/pingcap/kvproto/pkg/pdpb"
27+
"github.com/pingcap/log"
28+
29+
"github.com/tikv/pd/pkg/core"
30+
"github.com/tikv/pd/pkg/errs"
31+
"github.com/tikv/pd/pkg/utils/keypath"
32+
"github.com/tikv/pd/pkg/utils/keyutil"
33+
)
34+
35+
// GetRegion implements the GetRegion RPC method.
36+
func GetRegion(rc *core.BasicCluster, request *pdpb.GetRegionRequest) (resp *pdpb.GetRegionResponse, err error) {
37+
defer func() {
38+
incRegionRequestCounter("GetRegion", request.Header, resp.Header.Error)
39+
}()
40+
41+
if rc == nil {
42+
return &pdpb.GetRegionResponse{Header: notBootstrappedHeader()}, nil
43+
}
44+
region := rc.GetRegionByKey(request.GetRegionKey())
45+
if region == nil {
46+
log.Warn("leader get region nil", zap.String("key", string(request.GetRegionKey())))
47+
return &pdpb.GetRegionResponse{Header: wrapHeader()}, nil
48+
}
49+
50+
var buckets *metapb.Buckets
51+
if request.GetNeedBuckets() {
52+
buckets = region.GetBuckets()
53+
}
54+
return &pdpb.GetRegionResponse{
55+
Header: wrapHeader(),
56+
Region: region.GetMeta(),
57+
Leader: region.GetLeader(),
58+
DownPeers: region.GetDownPeers(),
59+
PendingPeers: region.GetPendingPeers(),
60+
Buckets: buckets,
61+
}, nil
62+
}
63+
64+
// GetPrevRegion implements gRPC PDServer
65+
func GetPrevRegion(rc *core.BasicCluster, request *pdpb.GetRegionRequest) (resp *pdpb.GetRegionResponse, err error) {
66+
defer func() {
67+
incRegionRequestCounter("GetPrevRegion", request.Header, resp.Header.Error)
68+
}()
69+
70+
if rc == nil {
71+
return &pdpb.GetRegionResponse{Header: notBootstrappedHeader()}, nil
72+
}
73+
74+
region := rc.GetPrevRegionByKey(request.GetRegionKey())
75+
if region == nil {
76+
return &pdpb.GetRegionResponse{Header: wrapHeader()}, nil
77+
}
78+
var buckets *metapb.Buckets
79+
if request.GetNeedBuckets() {
80+
buckets = region.GetBuckets()
81+
}
82+
return &pdpb.GetRegionResponse{
83+
Header: wrapHeader(),
84+
Region: region.GetMeta(),
85+
Leader: region.GetLeader(),
86+
DownPeers: region.GetDownPeers(),
87+
PendingPeers: region.GetPendingPeers(),
88+
Buckets: buckets,
89+
}, nil
90+
}
91+
92+
// GetRegionByID implements gRPC PDServer.
93+
func GetRegionByID(rc *core.BasicCluster, request *pdpb.GetRegionByIDRequest) (resp *pdpb.GetRegionResponse, err error) {
94+
defer func() {
95+
incRegionRequestCounter("GetRegionByID", request.Header, resp.Header.Error)
96+
}()
97+
if rc == nil {
98+
return &pdpb.GetRegionResponse{Header: notBootstrappedHeader()}, nil
99+
}
100+
region := rc.GetRegion(request.GetRegionId())
101+
if region == nil {
102+
return &pdpb.GetRegionResponse{Header: wrapHeader()}, nil
103+
}
104+
var buckets *metapb.Buckets
105+
if request.GetNeedBuckets() {
106+
buckets = region.GetBuckets()
107+
}
108+
return &pdpb.GetRegionResponse{
109+
Header: wrapHeader(),
110+
Region: region.GetMeta(),
111+
Leader: region.GetLeader(),
112+
DownPeers: region.GetDownPeers(),
113+
PendingPeers: region.GetPendingPeers(),
114+
Buckets: buckets,
115+
}, nil
116+
}
117+
118+
// ScanRegions implements gRPC PDServer.
119+
// Deprecated: use BatchScanRegions instead.
120+
func ScanRegions(rc *core.BasicCluster, request *pdpb.ScanRegionsRequest) (resp *pdpb.ScanRegionsResponse, err error) {
121+
defer func() {
122+
incRegionRequestCounter("ScanRegions", request.Header, resp.Header.Error)
123+
}()
124+
if rc == nil {
125+
return &pdpb.ScanRegionsResponse{Header: notBootstrappedHeader()}, nil
126+
}
127+
regions := rc.ScanRegions(request.GetStartKey(), request.GetEndKey(), int(request.GetLimit()))
128+
if len(regions) == 0 {
129+
return &pdpb.ScanRegionsResponse{Header: regionNotFound()}, nil
130+
}
131+
resp = &pdpb.ScanRegionsResponse{Header: wrapHeader()}
132+
for _, r := range regions {
133+
leader := r.GetLeader()
134+
if leader == nil {
135+
leader = &metapb.Peer{}
136+
}
137+
// Set RegionMetas and Leaders to make it compatible with old client.
138+
resp.RegionMetas = append(resp.RegionMetas, r.GetMeta())
139+
resp.Leaders = append(resp.Leaders, leader)
140+
resp.Regions = append(resp.Regions, &pdpb.Region{
141+
Region: r.GetMeta(),
142+
Leader: leader,
143+
DownPeers: r.GetDownPeers(),
144+
PendingPeers: r.GetPendingPeers(),
145+
})
146+
}
147+
return resp, nil
148+
}
149+
150+
// BatchScanRegions implements gRPC PDServer.
151+
func BatchScanRegions(rc *core.BasicCluster, request *pdpb.BatchScanRegionsRequest) (resp *pdpb.BatchScanRegionsResponse, err error) {
152+
defer func() {
153+
incRegionRequestCounter("BatchScanRegions", request.Header, resp.Header.Error)
154+
}()
155+
156+
if rc == nil {
157+
return &pdpb.BatchScanRegionsResponse{Header: notBootstrappedHeader()}, nil
158+
}
159+
needBucket := request.GetNeedBuckets()
160+
limit := request.GetLimit()
161+
// cast to keyutil.KeyRanges and check the validation.
162+
keyRanges := keyutil.NewKeyRangesWithSize(len(request.GetRanges()))
163+
reqRanges := request.GetRanges()
164+
for i, reqRange := range reqRanges {
165+
if i > 0 {
166+
if bytes.Compare(reqRange.StartKey, reqRanges[i-1].EndKey) < 0 {
167+
return &pdpb.BatchScanRegionsResponse{Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, "invalid key range, ranges overlapped")}, nil
168+
}
169+
}
170+
if len(reqRange.EndKey) > 0 && bytes.Compare(reqRange.StartKey, reqRange.EndKey) > 0 {
171+
return &pdpb.BatchScanRegionsResponse{Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, "invalid key range, start key > end key")}, nil
172+
}
173+
keyRanges.Append(reqRange.StartKey, reqRange.EndKey)
174+
}
175+
176+
scanOptions := []core.BatchScanRegionsOptionFunc{core.WithLimit(int(limit))}
177+
if request.ContainAllKeyRange {
178+
scanOptions = append(scanOptions, core.WithOutputMustContainAllKeyRange())
179+
}
180+
res, err := rc.BatchScanRegions(keyRanges, scanOptions...)
181+
if err != nil {
182+
if errs.ErrRegionNotAdjacent.Equal(multierr.Errors(err)[0]) {
183+
return &pdpb.BatchScanRegionsResponse{
184+
Header: wrapErrorToHeader(pdpb.ErrorType_REGIONS_NOT_CONTAIN_ALL_KEY_RANGE, err.Error()),
185+
}, nil
186+
}
187+
return &pdpb.BatchScanRegionsResponse{
188+
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
189+
}, nil
190+
}
191+
regions := make([]*pdpb.Region, 0, len(res))
192+
for _, r := range res {
193+
leader := r.GetLeader()
194+
if leader == nil {
195+
leader = &metapb.Peer{}
196+
}
197+
var buckets *metapb.Buckets
198+
if needBucket {
199+
buckets = r.GetBuckets()
200+
}
201+
regions = append(regions, &pdpb.Region{
202+
Region: r.GetMeta(),
203+
Leader: leader,
204+
DownPeers: r.GetDownPeers(),
205+
PendingPeers: r.GetPendingPeers(),
206+
Buckets: buckets,
207+
})
208+
}
209+
if len(regions) == 0 {
210+
return &pdpb.BatchScanRegionsResponse{Header: regionNotFound()}, nil
211+
}
212+
resp = &pdpb.BatchScanRegionsResponse{Header: wrapHeader(), Regions: regions}
213+
return resp, nil
214+
}
215+
216+
// QueryRegion provides a stream processing of the region query.
217+
func QueryRegion(rc *core.BasicCluster, request *pdpb.QueryRegionRequest) *pdpb.QueryRegionResponse {
218+
for {
219+
needBuckets := request.GetNeedBuckets()
220+
start := time.Now()
221+
keyIDMap, prevKeyIDMap, regionsByID := rc.QueryRegions(
222+
request.GetKeys(),
223+
request.GetPrevKeys(),
224+
request.GetIds(),
225+
needBuckets,
226+
)
227+
queryRegionDuration.Observe(time.Since(start).Seconds())
228+
// Build the response and send it to the client.
229+
response := &pdpb.QueryRegionResponse{
230+
Header: wrapHeader(),
231+
KeyIdMap: keyIDMap,
232+
PrevKeyIdMap: prevKeyIDMap,
233+
RegionsById: regionsByID,
234+
}
235+
incRegionRequestCounter("QueryRegion", request.Header, response.Header.Error)
236+
regionRequestCounter.WithLabelValues("QueryRegion", request.Header.CallerId,
237+
request.Header.CallerComponent, "").Inc()
238+
}
239+
}
240+
241+
// GetStore implements gRPC PDServer.
242+
func GetStore(rc *core.BasicCluster, request *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error) {
243+
if rc == nil {
244+
return &pdpb.GetStoreResponse{Header: notBootstrappedHeader()}, nil
245+
}
246+
247+
storeID := request.GetStoreId()
248+
store := rc.GetStore(storeID)
249+
if store == nil {
250+
return &pdpb.GetStoreResponse{
251+
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
252+
fmt.Sprintf("invalid store ID %d, not found", storeID)),
253+
}, nil
254+
}
255+
return &pdpb.GetStoreResponse{
256+
Header: wrapHeader(),
257+
Store: store.GetMeta(),
258+
Stats: store.GetStoreStats(),
259+
}, nil
260+
}
261+
262+
// GetAllStores implements the GetAllStores RPC method.
263+
func GetAllStores(rc *core.BasicCluster, request *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error) {
264+
if rc == nil {
265+
return &pdpb.GetAllStoresResponse{Header: notBootstrappedHeader()}, nil
266+
}
267+
var stores []*metapb.Store
268+
if request.GetExcludeTombstoneStores() {
269+
for _, store := range rc.GetMetaStores() {
270+
if store.GetNodeState() != metapb.NodeState_Removed {
271+
stores = append(stores, store)
272+
}
273+
}
274+
} else {
275+
stores = rc.GetMetaStores()
276+
}
277+
return &pdpb.GetAllStoresResponse{
278+
Header: wrapHeader(),
279+
Stores: stores,
280+
}, nil
281+
}
282+
283+
func wrapHeader() *pdpb.ResponseHeader {
284+
clusterID := keypath.ClusterID()
285+
if clusterID == 0 {
286+
return wrapErrorToHeader(pdpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready")
287+
}
288+
return &pdpb.ResponseHeader{ClusterId: clusterID}
289+
}
290+
291+
func wrapErrorToHeader(errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader {
292+
return errorHeader(&pdpb.Error{
293+
Type: errorType,
294+
Message: message,
295+
})
296+
}
297+
298+
func errorHeader(err *pdpb.Error) *pdpb.ResponseHeader {
299+
return &pdpb.ResponseHeader{
300+
ClusterId: keypath.ClusterID(),
301+
Error: err,
302+
}
303+
}
304+
305+
func notBootstrappedHeader() *pdpb.ResponseHeader {
306+
return errorHeader(&pdpb.Error{
307+
Type: pdpb.ErrorType_NOT_BOOTSTRAPPED,
308+
Message: "cluster is not bootstrapped",
309+
})
310+
}
311+
312+
func regionNotFound() *pdpb.ResponseHeader {
313+
return errorHeader(&pdpb.Error{
314+
Type: pdpb.ErrorType_REGION_NOT_FOUND,
315+
Message: "region not found",
316+
})
317+
}

0 commit comments

Comments
 (0)