Skip to content
Merged
27 changes: 11 additions & 16 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
mrand "math/rand"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -32,8 +33,6 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/utils/keyutil"
)

Expand Down Expand Up @@ -836,25 +835,25 @@ const (
keyLength = 100
)

func newRegionInfoIDRandom(re *require.Assertions, idAllocator id.Allocator) *RegionInfo {
var baseID = atomic.Uint64{}

func newRegionInfoIDRandom() *RegionInfo {
var (
peers []*metapb.Peer
leader *metapb.Peer
)
// Randomly select a peer as the leader.
leaderIdx := mrand.Intn(peerNum)
for i := range peerNum {
id, _, err := idAllocator.Alloc(1)
re.NoError(err)
id := baseID.Add(1)
// Randomly distribute the peers to different stores.
p := &metapb.Peer{Id: id, StoreId: uint64(mrand.Intn(storeNum) + 1)}
if i == leaderIdx {
leader = p
}
peers = append(peers, p)
}
regionID, _, err := idAllocator.Alloc(1)
re.NoError(err)
regionID := baseID.Add(1)
return NewRegionInfo(
&metapb.Region{
Id: regionID,
Expand All @@ -878,10 +877,8 @@ func randomBytes(n int) []byte {
}

func BenchmarkAddRegion(b *testing.B) {
re := require.New(b)
regions := NewRegionsInfo()
idAllocator := mockid.NewIDAllocator()
items := generateRegionItems(re, idAllocator, 10000000)
items := generateRegionItems(10000000)
b.ResetTimer()
for i := range b.N {
origin, overlaps, rangeChanged := regions.SetRegion(items[i])
Expand All @@ -890,11 +887,9 @@ func BenchmarkAddRegion(b *testing.B) {
}

func BenchmarkUpdateSubTreeOrderInsensitive(b *testing.B) {
re := require.New(b)
idAllocator := mockid.NewIDAllocator()
for _, size := range []int{10, 100, 1000, 10000, 100000, 1000000, 10000000} {
regions := NewRegionsInfo()
items := generateRegionItems(re, idAllocator, size)
items := generateRegionItems(size)
// Update the subtrees from an empty `*RegionsInfo`.
b.Run(fmt.Sprintf("from empty with size %d", size), func(b *testing.B) {
b.ResetTimer()
Expand All @@ -919,7 +914,7 @@ func BenchmarkUpdateSubTreeOrderInsensitive(b *testing.B) {
// Update the subtrees from a non-empty `*RegionsInfo` with different regions,
// which means the regions are most likely overlapped.
b.Run(fmt.Sprintf("from overlapped regions with size %d", size), func(b *testing.B) {
items = generateRegionItems(re, idAllocator, size)
items = generateRegionItems(size)
b.ResetTimer()
for range b.N {
for idx := range items {
Expand All @@ -930,10 +925,10 @@ func BenchmarkUpdateSubTreeOrderInsensitive(b *testing.B) {
}
}

func generateRegionItems(re *require.Assertions, idAllocator *mockid.IDAllocator, size int) []*RegionInfo {
func generateRegionItems(size int) []*RegionInfo {
items := make([]*RegionInfo, size)
for i := range size {
items[i] = newRegionInfoIDRandom(re, idAllocator)
items[i] = newRegionInfoIDRandom()
}
return items
}
Expand Down
85 changes: 69 additions & 16 deletions pkg/mcs/router/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ package server

import (
"context"
"io"
"net/http"
"time"

"google.golang.org/grpc"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/routerpb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -73,41 +79,88 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler
}

// BatchScanRegions implements the BatchScanRegions RPC method.
func (*Service) BatchScanRegions(context.Context, *pdpb.BatchScanRegionsRequest) (*pdpb.BatchScanRegionsResponse, error) {
return &pdpb.BatchScanRegionsResponse{}, nil
func (s *Service) BatchScanRegions(_ctx context.Context, request *pdpb.BatchScanRegionsRequest) (*pdpb.BatchScanRegionsResponse, error) {
resp, err := grpcutil.BatchScanRegions(s.GetBasicCluster(), request, false)
grpcutil.RequestCounter("BatchScanRegions", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// ScanRegions implements the ScanRegions RPC method.
func (*Service) ScanRegions(context.Context, *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
return &pdpb.ScanRegionsResponse{}, nil
func (s *Service) ScanRegions(_ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
resp, err := grpcutil.ScanRegions(s.GetBasicCluster(), request, false)
grpcutil.RequestCounter("ScanRegions", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// GetRegion implements the GetRegion RPC method.
func (*Service) GetRegion(context.Context, *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
return &pdpb.GetRegionResponse{}, nil
func (s *Service) GetRegion(_ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
resp, err := grpcutil.GetRegion(s.GetBasicCluster(), request, false)
grpcutil.RequestCounter("GetRegion", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// GetAllStores implements the GetAllStores RPC method.
func (*Service) GetAllStores(context.Context, *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error) {
return &pdpb.GetAllStoresResponse{}, nil
func (s *Service) GetAllStores(_ctx context.Context, request *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error) {
resp, err := grpcutil.GetAllStores(s.GetBasicCluster(), request)
grpcutil.RequestCounter("GetAllStores", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// GetStore implements the GetStore RPC method.
func (*Service) GetStore(context.Context, *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error) {
return &pdpb.GetStoreResponse{}, nil
func (s *Service) GetStore(_ctx context.Context, request *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error) {
resp, err := grpcutil.GetStore(s.GetBasicCluster(), request)
grpcutil.RequestCounter("GetStore", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// GetPrevRegion implements the GetPrevRegion RPC method.
func (*Service) GetPrevRegion(context.Context, *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
return &pdpb.GetRegionResponse{}, nil
func (s *Service) GetPrevRegion(_ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
resp, err := grpcutil.GetPrevRegion(s.GetBasicCluster(), request, false)
grpcutil.RequestCounter("GetPrevRegion", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// GetRegionByID implements the GetRegionByID RPC method.
func (*Service) GetRegionByID(context.Context, *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error) {
return &pdpb.GetRegionResponse{}, nil
func (s *Service) GetRegionByID(_ctx context.Context, request *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error) {
resp, err := grpcutil.GetRegionByID(s.GetBasicCluster(), request, false)
grpcutil.RequestCounter("GetRegionByID", request.GetHeader(), resp.GetHeader().GetError(), regionRequestCounter)
return resp, err
}

// QueryRegion implements the QueryRegion RPC method.
func (*Service) QueryRegion(routerpb.Router_QueryRegionServer) error {
return nil
func (s *Service) QueryRegion(stream routerpb.Router_QueryRegionServer) error {
for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}
if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
}

cluster := s.GetBasicCluster()
start := time.Now()
needBuckets := request.GetNeedBuckets()
keyIDMap, prevKeyIDMap, regionsByID := cluster.QueryRegions(
request.GetKeys(),
request.GetPrevKeys(),
request.GetIds(),
needBuckets,
)
queryRegionDuration.Observe(time.Since(start).Seconds())
// Build the response and send it to the client.
response := &pdpb.QueryRegionResponse{
Header: grpcutil.WrapHeader(),
KeyIdMap: keyIDMap,
PrevKeyIdMap: prevKeyIDMap,
RegionsById: regionsByID,
}
grpcutil.RequestCounter("QueryRegion", request.Header, response.Header.Error, regionRequestCounter)
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}
}
}
48 changes: 48 additions & 0 deletions pkg/mcs/router/server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "router"
serverSubsystem = "server"
)

var (
queryRegionDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: "query_region_duration_seconds",
Help: "Bucketed histogram of processing time (s) of region query requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

regionRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: "region_request_cnt",
Help: "Counter of region request.",
}, []string{"request", "caller_id", "caller_component", "event"})
)

func init() {
prometheus.MustRegister(regionRequestCounter)
prometheus.MustRegister(queryRegionDuration)
}
Loading