Skip to content

Commit 81039e6

Browse files
committed
feat:grpc 服务发现, 客户端服务端封装, 全局LogID获取
1 parent f0a884c commit 81039e6

17 files changed

Lines changed: 1007 additions & 22 deletions

File tree

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ require (
6969
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
7070
github.com/davecgh/go-spew v1.1.1 // indirect
7171
github.com/deckarep/golang-set v1.7.1 // indirect
72+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
7273
github.com/eapache/go-resiliency v1.7.0 // indirect
7374
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
7475
github.com/eapache/queue v1.1.0 // indirect
@@ -125,6 +126,7 @@ require (
125126
github.com/prometheus/common v0.65.0 // indirect
126127
github.com/prometheus/procfs v0.17.0 // indirect
127128
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
129+
github.com/redis/go-redis/v9 v9.16.0 // indirect
128130
github.com/sagikazarmark/locafero v0.7.0 // indirect
129131
github.com/sourcegraph/conc v0.3.0 // indirect
130132
github.com/spf13/afero v1.12.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
125125
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
126126
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
127127
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
128+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
129+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
128130
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
129131
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
130132
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
@@ -385,6 +387,8 @@ github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7D
385387
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
386388
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
387389
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
390+
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
391+
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
388392
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
389393
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
390394
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=

pkg/transport/grpc/client.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package grpc
2+
3+
import (
4+
"time"
5+
6+
retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
7+
"github.com/muxi-Infra/muxi-micro/pkg/logger"
8+
"github.com/muxi-Infra/muxi-micro/pkg/logger/logx"
9+
"github.com/muxi-Infra/muxi-micro/pkg/tracer"
10+
"github.com/muxi-Infra/muxi-micro/pkg/transport/grpc/discovery"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/credentials/insecure"
13+
"google.golang.org/grpc/resolver"
14+
)
15+
16+
type Option2 func(*GRPCClient)
17+
18+
type GRPCClient struct {
19+
addr string
20+
name string
21+
l logger.Logger
22+
discoveryCenter discovery.DiscoverCenter
23+
interceptors []grpc.UnaryClientInterceptor
24+
conn *grpc.ClientConn
25+
}
26+
27+
func WithRetry(try uint, time time.Duration) Option2 {
28+
return func(c *GRPCClient) {
29+
interceptor := retry.UnaryClientInterceptor(
30+
retry.WithMax(try),
31+
retry.WithBackoff(retry.BackoffLinear(time)),
32+
)
33+
c.interceptors = append(c.interceptors, interceptor)
34+
}
35+
}
36+
37+
// WithAddress 用于不需要服务发现的情况
38+
func WithAddress(addr string) Option2 {
39+
return func(c *GRPCClient) {
40+
c.addr = addr
41+
}
42+
}
43+
44+
// WithDiscoveryName 设置服务发现的服务名
45+
func WithDiscoveryName(name string) Option2 {
46+
return func(c *GRPCClient) {
47+
c.name = name
48+
}
49+
}
50+
51+
// WithClientLogger 用于记录 resolver 新增或删减的节点
52+
func WithClientLogger(l logger.Logger) Option2 {
53+
return func(c *GRPCClient) {
54+
c.l = l
55+
}
56+
}
57+
58+
func WithServiceDiscovery(discoveryCenter discovery.DiscoverCenter) Option2 {
59+
return func(s *GRPCClient) {
60+
s.discoveryCenter = discoveryCenter
61+
}
62+
}
63+
64+
func WithClientTracer(t tracer.Tracer) Option2 {
65+
return func(c *GRPCClient) {
66+
c.interceptors = append(c.interceptors, t.ClientInterceptor())
67+
}
68+
}
69+
70+
func WithExtraClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) Option2 {
71+
return func(c *GRPCClient) {
72+
c.interceptors = append(c.interceptors, interceptors...)
73+
}
74+
}
75+
76+
// NewGRPCClient 每一个微服务,创建一个client,一个resolver
77+
func NewGRPCClient(opts ...Option2) (*GRPCClient, error) {
78+
client := &GRPCClient{
79+
addr: DefaultHost + ":" + DefaultPort,
80+
name: DefaultName,
81+
l: logx.NewStdLogger(),
82+
}
83+
84+
for _, opt := range opts {
85+
opt(client)
86+
}
87+
88+
if client.discoveryCenter == nil {
89+
conn, err := grpc.NewClient(
90+
client.addr,
91+
grpc.WithTransportCredentials(insecure.NewCredentials()),
92+
grpc.WithChainUnaryInterceptor(client.interceptors...),
93+
)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
client.conn = conn
99+
return client, nil
100+
}
101+
102+
r := discovery.NewResolver(client.name, client.discoveryCenter, client.l)
103+
resolver.Register(r)
104+
conn, err := grpc.NewClient(
105+
"muxi:///"+client.name, // 没用上
106+
grpc.WithTransportCredentials(insecure.NewCredentials()),
107+
grpc.WithChainUnaryInterceptor(client.interceptors...),
108+
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
109+
)
110+
if err != nil {
111+
return nil, err
112+
}
113+
client.conn = conn
114+
115+
return client, nil
116+
}
117+
118+
func (c *GRPCClient) Conn() *grpc.ClientConn {
119+
return c.conn
120+
}
121+
122+
func (c *GRPCClient) Close() error {
123+
return c.conn.Close()
124+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package discovery
2+
3+
import (
4+
"context"
5+
)
6+
7+
type DiscoverCenter interface {
8+
Discover(ctx context.Context, serviceName string) ([]string, error)
9+
Watch(ctx context.Context, serviceName string) <-chan *Event
10+
}
11+
12+
type Event struct {
13+
Type string
14+
Address string
15+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package etcd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"github.com/muxi-Infra/muxi-micro/pkg/logger"
11+
"github.com/muxi-Infra/muxi-micro/pkg/logger/logx"
12+
"github.com/muxi-Infra/muxi-micro/pkg/transport/grpc/discovery"
13+
clientv3 "go.etcd.io/etcd/client/v3"
14+
)
15+
16+
type EtcdDiscovery struct {
17+
client *clientv3.Client
18+
logger logger.Logger
19+
endpoints []string
20+
dialTimeout time.Duration
21+
namespace string
22+
username string
23+
password string
24+
watchEventChanSize int
25+
26+
sync.Mutex
27+
}
28+
29+
type Option func(*EtcdDiscovery)
30+
31+
func WithUsername(username string) Option {
32+
return func(r *EtcdDiscovery) {
33+
r.username = username
34+
}
35+
}
36+
37+
func WithPassword(password string) Option {
38+
return func(r *EtcdDiscovery) {
39+
r.password = password
40+
}
41+
}
42+
43+
func WithEndpoints(endpoints []string) Option {
44+
return func(r *EtcdDiscovery) {
45+
r.endpoints = endpoints
46+
}
47+
}
48+
49+
// WithLogger 用于记录新增或删减节点
50+
func WithLogger(l logger.Logger) Option {
51+
return func(r *EtcdDiscovery) {
52+
r.logger = l
53+
}
54+
}
55+
56+
func WithDialTimeout(timeout time.Duration) Option {
57+
return func(r *EtcdDiscovery) {
58+
r.dialTimeout = timeout
59+
}
60+
}
61+
62+
func WithNamespace(ns string) Option {
63+
return func(r *EtcdDiscovery) {
64+
r.namespace = ns
65+
}
66+
}
67+
68+
// WithWatchEventChanSize 设置监听etcd变更的channel大小, 默认为10
69+
func WithWatchEventChanSize(size int) Option {
70+
return func(r *EtcdDiscovery) {
71+
r.watchEventChanSize = size
72+
}
73+
}
74+
75+
func NewEtcdDiscovery(opts ...Option) (*EtcdDiscovery, error) {
76+
r := &EtcdDiscovery{
77+
endpoints: []string{"127.0.0.1:2379"},
78+
dialTimeout: 5 * time.Second,
79+
namespace: "/services",
80+
logger: logx.NewStdLogger(),
81+
username: "root",
82+
password: "12345678",
83+
watchEventChanSize: 10,
84+
}
85+
86+
for _, opt := range opts {
87+
opt(r)
88+
}
89+
90+
cli, err := clientv3.New(clientv3.Config{
91+
Endpoints: r.endpoints,
92+
DialTimeout: r.dialTimeout,
93+
Username: r.username,
94+
Password: r.password,
95+
})
96+
if err != nil {
97+
return nil, err
98+
}
99+
r.client = cli
100+
return r, nil
101+
}
102+
103+
func (r *EtcdDiscovery) Discover(ctx context.Context, serviceName string) ([]string, error) {
104+
key := fmt.Sprintf("%s/%s/", r.namespace, serviceName)
105+
106+
val, err := r.client.Get(ctx, key, clientv3.WithPrefix())
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
var nodes []string
112+
for _, kv := range val.Kvs {
113+
nodes = append(nodes, string(kv.Value))
114+
}
115+
116+
return nodes, nil
117+
}
118+
119+
// Watch 动态监听etcd变更
120+
func (r *EtcdDiscovery) Watch(ctx context.Context, serviceName string) <-chan *discovery.Event {
121+
eventCh := make(chan *discovery.Event, r.watchEventChanSize)
122+
123+
go func() {
124+
defer func() {
125+
close(eventCh)
126+
}()
127+
128+
key := fmt.Sprintf("%s/%s/", r.namespace, serviceName)
129+
ch := r.client.Watch(ctx, key, clientv3.WithPrefix())
130+
for resp := range ch {
131+
for _, ev := range resp.Events {
132+
switch ev.Type {
133+
case clientv3.EventTypePut:
134+
eventCh <- &discovery.Event{
135+
Type: "PUT",
136+
Address: string(ev.Kv.Value),
137+
}
138+
r.logger.Warn(fmt.Sprintf("service[%s] has a new address: %s add into etcd", serviceName, string(ev.Kv.Value)))
139+
case clientv3.EventTypeDelete:
140+
addr := strings.TrimPrefix(string(ev.Kv.Key), key)
141+
eventCh <- &discovery.Event{
142+
Type: "DELETE",
143+
Address: addr,
144+
}
145+
r.logger.Warn(fmt.Sprintf("(service[%s], addr[%s]) has been removed from etcd", serviceName, addr))
146+
}
147+
}
148+
}
149+
}()
150+
151+
return eventCh
152+
}

0 commit comments

Comments
 (0)