Skip to content

Commit 6c0e4d9

Browse files
ptaborchaochn47
authored andcommitted
Introduce grpc-1.30+ compatible client/v3/naming API.
This is not yet implementation, just API and tests to be filled with implementation in next CLs, tracked by: #12652 We propose here 3 packages: - clientv3/naming/endpoints -> That is abstraction layer over etcd that allows to write, read & watch Endpoints information. It's independent from GRPC API. It hides the storage details. - clientv3/naming/endpoints/internal -> That contains the grpc's compatible Update class to preserve the internal JSON mashalling format. - clientv3/naming/resolver -> That implements the GRPC resolver API, such that etcd can be used for connection.Dial in grpc. Please see the grpc_naming.md document changes & grpcproxy/cluster.go new integration, to see how the new abstractions work. Signed-off-by: Chao Chen <chaochn@amazon.com>
1 parent 3663ae1 commit 6c0e4d9

12 files changed

Lines changed: 590 additions & 38 deletions

File tree

Documentation/dev-guide/grpc_naming.md

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,54 +6,56 @@ etcd provides a gRPC resolver to support an alternative name system that fetches
66

77
## Using etcd discovery with go-grpc
88

9-
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
9+
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:
1010

1111
```go
1212
import (
1313
"go.etcd.io/etcd/clientv3"
14-
etcdnaming "go.etcd.io/etcd/clientv3/naming"
14+
resolver "go.etcd.io/etcd/clientv3/naming/resolver"
1515

1616
"google.golang.org/grpc"
1717
)
1818

19-
...
20-
2119
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
22-
r := &etcdnaming.GRPCResolver{Client: cli}
23-
b := grpc.RoundRobin(r)
24-
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
20+
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
21+
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
2522
```
2623

2724
## Managing service endpoints
2825

29-
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
26+
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
27+
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
28+
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
3029

3130
### Adding an endpoint
3231

3332
New endpoints can be added to the service through `etcdctl`:
3433

3534
```sh
36-
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
35+
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
3736
```
3837

39-
The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
38+
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:
4039

4140
```go
42-
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})
41+
42+
em := endpoints.NewManager(client, "foo/bar/my-service")
43+
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
4344
```
4445

4546
### Deleting an endpoint
4647

4748
Hosts can be deleted from the service through `etcdctl`:
4849

4950
```sh
50-
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
51+
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
5152
```
5253

53-
The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
54+
The etcd client's `endpoints.Manager` method also supports deleting endpoints:
5455

5556
```go
56-
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
57+
em := endpoints.NewManager(client, "foo/bar/my-service")
58+
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
5759
```
5860

5961
### Registering an endpoint with a lease
@@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
6567
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
6668
ETCDCTL_API=3 etcdctl lease keep-alive $lease
6769
```
70+
In the golang:
71+
72+
```go
73+
em := endpoints.NewManager(client, "foo/bar/my-service")
74+
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
75+
```
76+
77+
### Atomically updating endpoints
78+
79+
If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:
80+
81+
```
82+
em := endpoints.NewManager(c, "foo")
83+
84+
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
85+
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
86+
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
87+
```
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2016 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package naming
16+
17+
import (
18+
"context"
19+
"reflect"
20+
"testing"
21+
22+
etcd "go.etcd.io/etcd/clientv3"
23+
"go.etcd.io/etcd/clientv3/naming/endpoints"
24+
25+
"go.etcd.io/etcd/integration"
26+
"go.etcd.io/etcd/pkg/testutil"
27+
)
28+
29+
func TestEndpointManager(t *testing.T) {
30+
t.Skip("Not implemented yet")
31+
32+
defer testutil.AfterTest(t)
33+
34+
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
35+
defer clus.Terminate(t)
36+
37+
em, err := endpoints.NewManager(clus.RandClient(), "foo")
38+
if err != nil {
39+
t.Fatal("failed to create EndpointManager", err)
40+
}
41+
ctx, watchCancel := context.WithCancel(context.Background())
42+
defer watchCancel()
43+
w, err := em.NewWatchChannel(ctx)
44+
if err != nil {
45+
t.Fatal("failed to establish watch", err)
46+
}
47+
48+
e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"}
49+
err = em.AddEndpoint(context.TODO(), "foo/a1", e1)
50+
if err != nil {
51+
t.Fatal("failed to add foo", err)
52+
}
53+
54+
us := <-w
55+
56+
if us == nil {
57+
t.Fatal("failed to get update", err)
58+
}
59+
60+
wu := endpoints.Update{
61+
Op: endpoints.Add,
62+
Key: "foo/a1",
63+
Endpoint: e1,
64+
}
65+
66+
if !reflect.DeepEqual(us[0], wu) {
67+
t.Fatalf("up = %#v, want %#v", us[0], wu)
68+
}
69+
70+
err = em.DeleteEndpoint(context.TODO(), "foo/a1")
71+
if err != nil {
72+
t.Fatalf("failed to udpate %v", err)
73+
}
74+
75+
us = <-w
76+
if err != nil {
77+
t.Fatalf("failed to get udpate %v", err)
78+
}
79+
80+
wu = endpoints.Update{
81+
Op: endpoints.Delete,
82+
Key: "foo/a1",
83+
}
84+
85+
if !reflect.DeepEqual(us, wu) {
86+
t.Fatalf("up = %#v, want %#v", us[1], wu)
87+
}
88+
}
89+
90+
// TestEndpointManagerAtomicity ensures the resolver will initialize
91+
// correctly with multiple hosts and correctly receive multiple
92+
// updates in a single revision.
93+
func TestEndpointManagerAtomicity(t *testing.T) {
94+
t.Skip("Not implemented yet")
95+
96+
defer testutil.AfterTest(t)
97+
98+
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
99+
defer clus.Terminate(t)
100+
101+
c := clus.RandClient()
102+
em, err := endpoints.NewManager(c, "foo")
103+
if err != nil {
104+
t.Fatal("failed to create EndpointManager", err)
105+
}
106+
107+
err = em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
108+
endpoints.NewAddUpdateOpts("foo/host", endpoints.Endpoint{Addr: "127.0.0.1:2000"}),
109+
endpoints.NewAddUpdateOpts("foo/host2", endpoints.Endpoint{Addr: "127.0.0.1:2001"})})
110+
if err != nil {
111+
t.Fatal(err)
112+
}
113+
114+
ctx, watchCancel := context.WithCancel(context.Background())
115+
defer watchCancel()
116+
w, err := em.NewWatchChannel(ctx)
117+
if err != nil {
118+
t.Fatal(err)
119+
}
120+
121+
updates := <-w
122+
if len(updates) != 2 {
123+
t.Fatalf("expected two updates, got %+v", updates)
124+
}
125+
126+
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
127+
if err != nil {
128+
t.Fatal(err)
129+
}
130+
131+
updates = <-w
132+
if len(updates) != 2 || (updates[0].Op != endpoints.Delete && updates[1].Op != endpoints.Delete) {
133+
t.Fatalf("expected two delete updates, got %+v", updates)
134+
}
135+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2016 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package naming
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"google.golang.org/grpc"
22+
23+
"go.etcd.io/etcd/clientv3/naming/endpoints"
24+
"go.etcd.io/etcd/clientv3/naming/resolver"
25+
"go.etcd.io/etcd/integration"
26+
"go.etcd.io/etcd/pkg/testutil"
27+
)
28+
29+
// This test mimics scenario described in grpc_naming.md doc.
30+
31+
func TestEtcdGrpcResolver(t *testing.T) {
32+
t.Skip("Not implemented yet")
33+
34+
defer testutil.AfterTest(t)
35+
36+
// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
37+
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001
38+
39+
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
40+
defer clus.Terminate(t)
41+
42+
em, err := endpoints.NewManager(clus.RandClient(), "foo")
43+
if err != nil {
44+
t.Fatal("failed to create EndpointManager", err)
45+
}
46+
47+
e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
48+
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}
49+
50+
err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
51+
if err != nil {
52+
t.Fatal("failed to add foo", err)
53+
}
54+
etcdResolver, err := resolver.NewBuilder(clus.RandClient())
55+
56+
conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
57+
if err != nil {
58+
t.Fatal("failed to connect to foo (e1)", err)
59+
}
60+
61+
// TODO: send requests to conn, ensure s1 received it.
62+
63+
em.DeleteEndpoint(context.TODO(), "foo/e1")
64+
em.AddEndpoint(context.TODO(), "foo/e2", e2)
65+
66+
// TODO: Send requests to conn and make sure s2 receive it.
67+
// Might require restarting s1 to break the existing (open) connection.
68+
69+
conn.GetState() // this line is to avoid compiler warning that conn is unused.
70+
}

clientv3/naming/doc.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,44 +12,47 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
15+
// Package naming provides:
16+
// - subpackage endpoints: an abstraction layer to store and read endpoints
17+
// information from etcd.
18+
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
19+
// services based on the endpoints configuration
1620
//
1721
// To use, first import the packages:
1822
//
1923
// import (
2024
// "go.etcd.io/etcd/clientv3"
21-
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
22-
//
25+
// "go.etcd.io/etcd/clientv3/naming/endpoints"
26+
// "go.etcd.io/etcd/clientv3/naming/resolver"
2327
// "google.golang.org/grpc"
24-
// "google.golang.org/grpc/naming"
2528
// )
2629
//
2730
// First, register new endpoint addresses for a service:
2831
//
2932
// func etcdAdd(c *clientv3.Client, service, addr string) error {
30-
// r := &etcdnaming.GRPCResolver{Client: c}
31-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
33+
// em := endpoints.NewManager(c, service)
34+
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr})
3235
// }
3336
//
3437
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
3538
//
3639
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
37-
// r := &etcdnaming.GRPCResolver{Client: c}
38-
// b := grpc.RoundRobin(r)
39-
// return grpc.Dial(service, grpc.WithBalancer(b))
40+
// etcdResolver, err := resolver.NewBuilder(c);
41+
// if err { return nil, err }
42+
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
4043
// }
4144
//
4245
// Optionally, force delete an endpoint:
4346
//
4447
// func etcdDelete(c *clientv3, service, addr string) error {
45-
// r := &etcdnaming.GRPCResolver{Client: c}
46-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
48+
// em := endpoints.NewManager(c, service)
49+
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
4750
// }
4851
//
4952
// Or register an expiring endpoint with a lease:
5053
//
51-
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
52-
// r := &etcdnaming.GRPCResolver{Client: c}
53-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
54+
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
55+
// em := endpoints.NewManager(c, service)
56+
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid))
5457
// }
5558
package naming

0 commit comments

Comments
 (0)