Skip to content

Commit fb25fb2

Browse files
authored
Merge pull request #5 from luohuixi/main
包装了sarama包的生产消费者
2 parents 507560e + 8d7e27c commit fb25fb2

64 files changed

Lines changed: 4242 additions & 365 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

example/kafka/kafka.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/IBM/sarama"
9+
"github.com/muxi-Infra/muxi-micro/pkg/kafka"
10+
)
11+
12+
// 同步生产者示例
13+
func SyncProducer() {
14+
p := kafka.NewSyncConfig([]string{"localhost:9092"})
15+
//可自行改配置如:(可选)
16+
//p.GetConfig().Producer.RequiredAcks = sarama.WaitForAll
17+
18+
err := p.CreateProducer()
19+
if err != nil {
20+
log.Fatal(err)
21+
}
22+
23+
partition, offset, err := p.SendMessage("study2", "testkey8", "testvalue8")
24+
if err != nil {
25+
log.Fatal(err)
26+
}
27+
log.Printf("partition: %d, offset: %d\n", partition, offset)
28+
29+
err = p.Close()
30+
if err != nil {
31+
log.Fatal(err)
32+
}
33+
}
34+
35+
// 异步生产者示例
36+
func AsyncProducer() {
37+
p := kafka.NewAsyncConfig([]string{"localhost:9092"})
38+
//可自行改配置如:(可选)
39+
p.GetConfig().Producer.Return.Successes = true
40+
41+
err := p.CreateProducer()
42+
if err != nil {
43+
log.Fatal(err)
44+
}
45+
46+
//异步获取成功消息
47+
ch := p.GetSuccess()
48+
//或 ch:= p.GetError()
49+
go func() {
50+
for msg := range ch {
51+
log.Printf("partition: %d, offset: %d, key: %s, value: %s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
52+
}
53+
}()
54+
55+
p.SendMessage("study2", "testkey5", "testvalue5")
56+
if err != nil {
57+
log.Fatal(err)
58+
}
59+
60+
time.Sleep(time.Second * 5)
61+
62+
err = p.Close()
63+
if err != nil {
64+
log.Fatal(err)
65+
}
66+
67+
}
68+
69+
// 消费者示例
70+
func Consumer() {
71+
// 先构造 handler 实现 ConsumerGroupHandler 接口
72+
// setup, cleanup, consumeclaim 都要用户自行实现
73+
fn := func(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
74+
for msg := range claim.Messages() {
75+
log.Println("partition: ", msg.Partition, "offset: ", msg.Offset, "key: ", string(msg.Key), "value: ", string(msg.Value))
76+
session.MarkMessage(msg, "")
77+
}
78+
return nil // 正常退出
79+
}
80+
handler := kafka.NewHandler(nil, nil, fn)
81+
82+
// 构造消费者组
83+
c := kafka.NewConsumerConfig([]string{"localhost:9092"}, "test-group", []string{"study2"}, handler)
84+
//可自行改配置如:(可选)
85+
//c.GetConfig().Consumer.Offsets.Initial = sarama.OffsetOldest
86+
87+
err := c.CreateConsumerGroup()
88+
if err != nil {
89+
log.Fatal(err)
90+
}
91+
92+
// 消费消息
93+
ctx, cancel := context.WithCancel(context.Background())
94+
95+
// 取消消费
96+
go func() {
97+
time.Sleep(time.Second * 10)
98+
cancel()
99+
}()
100+
101+
err = c.Consume(ctx)
102+
if err != nil {
103+
log.Fatal(err)
104+
}
105+
106+
err = c.Close()
107+
if err != nil {
108+
log.Fatal(err)
109+
}
110+
}
111+
112+
func main() {
113+
SyncProducer()
114+
//AsyncProducer()
115+
Consumer()
116+
}

example/tracer/tracer.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"github.com/gin-gonic/gin"
6+
"github.com/muxi-Infra/muxi-micro/pkg/tracer"
7+
"google.golang.org/grpc"
8+
"google.golang.org/grpc/credentials/insecure"
9+
"google.golang.org/grpc/examples/helloworld/helloworld"
10+
"log"
11+
"net"
12+
)
13+
14+
// 简易的 HelloWorld 实现
15+
type server struct {
16+
helloworld.UnimplementedGreeterServer
17+
}
18+
19+
func (s *server) SayHello(_ context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
20+
log.Printf("Received: %v", in.GetName())
21+
return &helloworld.HelloReply{Message: "Hello " + in.GetName()}, nil
22+
}
23+
24+
func GrpcServer() {
25+
// Zipkin
26+
config, err := tracer.NewZipkin(
27+
"http://localhost:9411/api/v2/spans",
28+
"demo_service",
29+
"localhost:50051",
30+
1,
31+
)
32+
33+
// Jaeger
34+
//config , err := tracer.NewJaeger(
35+
// "http://localhost:14268/api/traces",
36+
// "demo_service",
37+
// 1,
38+
// )
39+
40+
// SkyWalking
41+
//config , err := tracer.NewSkyWalking(
42+
// "localhost:11800",
43+
// "demo_service",
44+
// "demo_instance", //实例名
45+
// 1
46+
// )
47+
48+
if err != nil {
49+
log.Fatal(err)
50+
}
51+
defer func() {
52+
if err := config.Close(); err != nil {
53+
log.Println(err)
54+
}
55+
}()
56+
57+
s := grpc.NewServer(
58+
grpc.ChainUnaryInterceptor(
59+
config.ServerInterceptor(),
60+
),
61+
)
62+
63+
helloworld.RegisterGreeterServer(s, &server{})
64+
65+
lis, err := net.Listen("tcp", "0.0.0.0:50051")
66+
if err != nil {
67+
log.Fatalf("failed to listen: %v", err)
68+
}
69+
70+
if err := s.Serve(lis); err != nil {
71+
log.Fatalf("failed to serve: %v", err)
72+
}
73+
}
74+
75+
func GrpcClient() {
76+
// 同上
77+
config, err := tracer.NewZipkin(
78+
"http://localhost:9411/api/v2/spans",
79+
"demo_client",
80+
"localhost:50051",
81+
1,
82+
)
83+
if err != nil {
84+
log.Fatal(err)
85+
}
86+
defer func() {
87+
if err := config.Close(); err != nil {
88+
log.Println(err)
89+
}
90+
}()
91+
92+
conn, err := grpc.NewClient(
93+
"localhost:50051",
94+
grpc.WithTransportCredentials(insecure.NewCredentials()),
95+
grpc.WithChainUnaryInterceptor(
96+
config.ClientInterceptor(),
97+
),
98+
)
99+
if err != nil {
100+
log.Fatalf("did not connect: %v", err)
101+
}
102+
defer conn.Close()
103+
104+
c := helloworld.NewGreeterClient(conn)
105+
106+
resp, err := c.SayHello(context.Background(), &helloworld.HelloRequest{Name: "MuXi"})
107+
if err != nil {
108+
log.Fatalf("could not greet: %v", err)
109+
}
110+
log.Printf("Greeting: %s", resp.Message)
111+
}
112+
113+
func GinService() {
114+
// 同上
115+
config, err := tracer.NewZipkin(
116+
"http://localhost:9411/api/v2/spans",
117+
"demo_gin",
118+
"localhost:8081",
119+
1,
120+
)
121+
if err != nil {
122+
log.Fatal(err)
123+
}
124+
defer func() {
125+
if err := config.Close(); err != nil {
126+
log.Println(err)
127+
}
128+
}()
129+
130+
r := gin.Default()
131+
r.Use(config.GinMiddleware(r))
132+
133+
r.GET("/hello", func(c *gin.Context) {
134+
c.JSON(200, gin.H{"message": "hello world"})
135+
})
136+
r.Run("0.0.0.0:8081")
137+
}
138+
139+
func main() {
140+
// grpc服务端
141+
//GrpcServer()
142+
// grpc客户端
143+
GrpcClient()
144+
// gin
145+
//GinService()
146+
}

0 commit comments

Comments
 (0)