Skip to content

Commit 7f7165e

Browse files
authored
Feat(router service): implement grpc function (#9723)
ref #9212 Signed-off-by: 童剑 <1045931706@qq.com>
1 parent eddd9a7 commit 7f7165e

File tree

10 files changed

+684
-479
lines changed

10 files changed

+684
-479
lines changed

pkg/core/region_test.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
mrand "math/rand"
2323
"strconv"
2424
"strings"
25+
"sync/atomic"
2526
"testing"
2627
"time"
2728

@@ -32,8 +33,6 @@ import (
3233
"github.com/pingcap/kvproto/pkg/pdpb"
3334

3435
"github.com/tikv/pd/pkg/errs"
35-
"github.com/tikv/pd/pkg/id"
36-
"github.com/tikv/pd/pkg/mock/mockid"
3736
"github.com/tikv/pd/pkg/utils/keyutil"
3837
)
3938

@@ -836,25 +835,25 @@ const (
836835
keyLength = 100
837836
)
838837

839-
func newRegionInfoIDRandom(re *require.Assertions, idAllocator id.Allocator) *RegionInfo {
838+
var baseID = atomic.Uint64{}
839+
840+
func newRegionInfoIDRandom() *RegionInfo {
840841
var (
841842
peers []*metapb.Peer
842843
leader *metapb.Peer
843844
)
844845
// Randomly select a peer as the leader.
845846
leaderIdx := mrand.Intn(peerNum)
846847
for i := range peerNum {
847-
id, _, err := idAllocator.Alloc(1)
848-
re.NoError(err)
848+
id := baseID.Add(1)
849849
// Randomly distribute the peers to different stores.
850850
p := &metapb.Peer{Id: id, StoreId: uint64(mrand.Intn(storeNum) + 1)}
851851
if i == leaderIdx {
852852
leader = p
853853
}
854854
peers = append(peers, p)
855855
}
856-
regionID, _, err := idAllocator.Alloc(1)
857-
re.NoError(err)
856+
regionID := baseID.Add(1)
858857
return NewRegionInfo(
859858
&metapb.Region{
860859
Id: regionID,
@@ -878,10 +877,8 @@ func randomBytes(n int) []byte {
878877
}
879878

880879
func BenchmarkAddRegion(b *testing.B) {
881-
re := require.New(b)
882880
regions := NewRegionsInfo()
883-
idAllocator := mockid.NewIDAllocator()
884-
items := generateRegionItems(re, idAllocator, 10000000)
881+
items := generateRegionItems(10000000)
885882
b.ResetTimer()
886883
for i := range b.N {
887884
origin, overlaps, rangeChanged := regions.SetRegion(items[i])
@@ -890,11 +887,9 @@ func BenchmarkAddRegion(b *testing.B) {
890887
}
891888

892889
func BenchmarkUpdateSubTreeOrderInsensitive(b *testing.B) {
893-
re := require.New(b)
894-
idAllocator := mockid.NewIDAllocator()
895890
for _, size := range []int{10, 100, 1000, 10000, 100000, 1000000, 10000000} {
896891
regions := NewRegionsInfo()
897-
items := generateRegionItems(re, idAllocator, size)
892+
items := generateRegionItems(size)
898893
// Update the subtrees from an empty `*RegionsInfo`.
899894
b.Run(fmt.Sprintf("from empty with size %d", size), func(b *testing.B) {
900895
b.ResetTimer()
@@ -919,7 +914,7 @@ func BenchmarkUpdateSubTreeOrderInsensitive(b *testing.B) {
919914
// Update the subtrees from a non-empty `*RegionsInfo` with different regions,
920915
// which means the regions are most likely overlapped.
921916
b.Run(fmt.Sprintf("from overlapped regions with size %d", size), func(b *testing.B) {
922-
items = generateRegionItems(re, idAllocator, size)
917+
items = generateRegionItems(size)
923918
b.ResetTimer()
924919
for range b.N {
925920
for idx := range items {
@@ -930,10 +925,10 @@ func BenchmarkUpdateSubTreeOrderInsensitive(b *testing.B) {
930925
}
931926
}
932927

933-
func generateRegionItems(re *require.Assertions, idAllocator *mockid.IDAllocator, size int) []*RegionInfo {
928+
func generateRegionItems(size int) []*RegionInfo {
934929
items := make([]*RegionInfo, size)
935930
for i := range size {
936-
items[i] = newRegionInfoIDRandom(re, idAllocator)
931+
items[i] = newRegionInfoIDRandom()
937932
}
938933
return items
939934
}

pkg/mcs/router/server/grpc_service.go

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,23 @@ package server
1616

1717
import (
1818
"context"
19+
"io"
1920
"net/http"
21+
"time"
2022

2123
"google.golang.org/grpc"
2224

25+
"github.com/pingcap/errors"
2326
"github.com/pingcap/kvproto/pkg/pdpb"
2427
"github.com/pingcap/kvproto/pkg/routerpb"
2528
"github.com/pingcap/log"
2629

2730
bs "github.com/tikv/pd/pkg/basicserver"
31+
"github.com/tikv/pd/pkg/errs"
2832
"github.com/tikv/pd/pkg/mcs/registry"
2933
"github.com/tikv/pd/pkg/utils/apiutil"
34+
"github.com/tikv/pd/pkg/utils/grpcutil"
35+
"github.com/tikv/pd/pkg/utils/keypath"
3036
)
3137

3238
// SetUpRestHandler is a hook to sets up the REST service.
@@ -73,41 +79,88 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler
7379
}
7480

7581
// BatchScanRegions implements the BatchScanRegions RPC method.
76-
func (*Service) BatchScanRegions(context.Context, *pdpb.BatchScanRegionsRequest) (*pdpb.BatchScanRegionsResponse, error) {
77-
return &pdpb.BatchScanRegionsResponse{}, nil
82+
func (s *Service) BatchScanRegions(_ctx context.Context, request *pdpb.BatchScanRegionsRequest) (*pdpb.BatchScanRegionsResponse, error) {
83+
resp, err := grpcutil.BatchScanRegions(s.GetBasicCluster(), request, false)
84+
grpcutil.RequestCounter("BatchScanRegions", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
85+
return resp, err
7886
}
7987

8088
// ScanRegions implements the ScanRegions RPC method.
81-
func (*Service) ScanRegions(context.Context, *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
82-
return &pdpb.ScanRegionsResponse{}, nil
89+
func (s *Service) ScanRegions(_ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
90+
resp, err := grpcutil.ScanRegions(s.GetBasicCluster(), request, false)
91+
grpcutil.RequestCounter("ScanRegions", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
92+
return resp, err
8393
}
8494

8595
// GetRegion implements the GetRegion RPC method.
86-
func (*Service) GetRegion(context.Context, *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
87-
return &pdpb.GetRegionResponse{}, nil
96+
func (s *Service) GetRegion(_ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
97+
resp, err := grpcutil.GetRegion(s.GetBasicCluster(), request, false)
98+
grpcutil.RequestCounter("GetRegion", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
99+
return resp, err
88100
}
89101

90102
// GetAllStores implements the GetAllStores RPC method.
91-
func (*Service) GetAllStores(context.Context, *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error) {
92-
return &pdpb.GetAllStoresResponse{}, nil
103+
func (s *Service) GetAllStores(_ctx context.Context, request *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error) {
104+
resp, err := grpcutil.GetAllStores(s.GetBasicCluster(), request)
105+
grpcutil.RequestCounter("GetAllStores", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
106+
return resp, err
93107
}
94108

95109
// GetStore implements the GetStore RPC method.
96-
func (*Service) GetStore(context.Context, *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error) {
97-
return &pdpb.GetStoreResponse{}, nil
110+
func (s *Service) GetStore(_ctx context.Context, request *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error) {
111+
resp, err := grpcutil.GetStore(s.GetBasicCluster(), request)
112+
grpcutil.RequestCounter("GetStore", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
113+
return resp, err
98114
}
99115

100116
// GetPrevRegion implements the GetPrevRegion RPC method.
101-
func (*Service) GetPrevRegion(context.Context, *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
102-
return &pdpb.GetRegionResponse{}, nil
117+
func (s *Service) GetPrevRegion(_ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
118+
resp, err := grpcutil.GetPrevRegion(s.GetBasicCluster(), request, false)
119+
grpcutil.RequestCounter("GetPrevRegion", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
120+
return resp, err
103121
}
104122

105123
// GetRegionByID implements the GetRegionByID RPC method.
106-
func (*Service) GetRegionByID(context.Context, *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error) {
107-
return &pdpb.GetRegionResponse{}, nil
124+
func (s *Service) GetRegionByID(_ctx context.Context, request *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error) {
125+
resp, err := grpcutil.GetRegionByID(s.GetBasicCluster(), request, false)
126+
grpcutil.RequestCounter("GetRegionByID", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
127+
return resp, err
108128
}
109129

110130
// QueryRegion implements the QueryRegion RPC method.
111-
func (*Service) QueryRegion(routerpb.Router_QueryRegionServer) error {
112-
return nil
131+
func (s *Service) QueryRegion(stream routerpb.Router_QueryRegionServer) error {
132+
for {
133+
request, err := stream.Recv()
134+
if err == io.EOF {
135+
return nil
136+
}
137+
if err != nil {
138+
return errors.WithStack(err)
139+
}
140+
if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
141+
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
142+
}
143+
144+
cluster := s.GetBasicCluster()
145+
start := time.Now()
146+
needBuckets := request.GetNeedBuckets()
147+
keyIDMap, prevKeyIDMap, regionsByID := cluster.QueryRegions(
148+
request.GetKeys(),
149+
request.GetPrevKeys(),
150+
request.GetIds(),
151+
needBuckets,
152+
)
153+
queryRegionDuration.Observe(time.Since(start).Seconds())
154+
// Build the response and send it to the client.
155+
response := &pdpb.QueryRegionResponse{
156+
Header: grpcutil.WrapHeader(),
157+
KeyIdMap: keyIDMap,
158+
PrevKeyIdMap: prevKeyIDMap,
159+
RegionsById: regionsByID,
160+
}
161+
grpcutil.RequestCounter("QueryRegion", request.Header, response.Header.Error, regionRequestCounter)
162+
if err := stream.Send(response); err != nil {
163+
return errors.WithStack(err)
164+
}
165+
}
113166
}

pkg/mcs/router/server/metrics.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package server
16+
17+
import (
18+
"github.com/prometheus/client_golang/prometheus"
19+
)
20+
21+
const (
22+
namespace = "router"
23+
serverSubsystem = "server"
24+
)
25+
26+
var (
27+
queryRegionDuration = prometheus.NewHistogram(
28+
prometheus.HistogramOpts{
29+
Namespace: namespace,
30+
Subsystem: serverSubsystem,
31+
Name: "query_region_duration_seconds",
32+
Help: "Bucketed histogram of processing time (s) of region query requests.",
33+
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
34+
})
35+
36+
regionRequestCounter = prometheus.NewCounterVec(
37+
prometheus.CounterOpts{
38+
Namespace: namespace,
39+
Subsystem: serverSubsystem,
40+
Name: "region_request_cnt",
41+
Help: "Counter of region request.",
42+
}, []string{"request", "caller_id", "caller_component", "event"})
43+
)
44+
45+
func init() {
46+
prometheus.MustRegister(regionRequestCounter)
47+
prometheus.MustRegister(queryRegionDuration)
48+
}

0 commit comments

Comments
 (0)