Skip to content

Commit f3bef24

Browse files
ptaborchaochn47
authored andcommitted
Introduce grpc-1.30+ compatible client/v3/naming API.
This is not yet implementation, just API and tests to be filled with implementation in next CLs, tracked by: #12652 We propose here 3 packages: - clientv3/naming/endpoints -> That is abstraction layer over etcd that allows to write, read & watch Endpoints information. It's independent from GRPC API. It hides the storage details. - clientv3/naming/endpoints/internal -> That contains the grpc's compatible Update class to preserve the internal JSON mashalling format. - clientv3/naming/resolver -> That implements the GRPC resolver API, such that etcd can be used for connection.Dial in grpc. Please see the grpc_naming.md document changes & grpcproxy/cluster.go new integration, to see how the new abstractions work. Signed-off-by: Chao Chen <chaochn@amazon.com>
1 parent 3663ae1 commit f3bef24

10 files changed

Lines changed: 532 additions & 34 deletions

File tree

Documentation/dev-guide/grpc_naming.md

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,54 +6,56 @@ etcd provides a gRPC resolver to support an alternative name system that fetches
66

77
## Using etcd discovery with go-grpc
88

9-
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
9+
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:
1010

1111
```go
1212
import (
13-
"go.etcd.io/etcd/clientv3"
14-
etcdnaming "go.etcd.io/etcd/clientv3/naming"
13+
"go.etcd.io/etcd/v3/clientv3"
14+
resolver "go.etcd.io/etcd/v3/clientv3/naming/resolver"
1515

1616
"google.golang.org/grpc"
1717
)
1818

19-
...
20-
2119
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
22-
r := &etcdnaming.GRPCResolver{Client: cli}
23-
b := grpc.RoundRobin(r)
24-
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
20+
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
21+
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
2522
```
2623

2724
## Managing service endpoints
2825

29-
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
26+
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
27+
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
28+
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
3029

3130
### Adding an endpoint
3231

3332
New endpoints can be added to the service through `etcdctl`:
3433

3534
```sh
36-
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
35+
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
3736
```
3837

39-
The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
38+
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:
4039

4140
```go
42-
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})
41+
42+
em := endpoints.NewManager(client, "foo/bar/my-service")
43+
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
4344
```
4445

4546
### Deleting an endpoint
4647

4748
Hosts can be deleted from the service through `etcdctl`:
4849

4950
```sh
50-
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
51+
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
5152
```
5253

53-
The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
54+
The etcd client's `endpoints.Manager` method also supports deleting endpoints:
5455

5556
```go
56-
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
57+
em := endpoints.NewManager(client, "foo/bar/my-service")
58+
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
5759
```
5860

5961
### Registering an endpoint with a lease
@@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
6567
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
6668
ETCDCTL_API=3 etcdctl lease keep-alive $lease
6769
```
70+
In the golang:
71+
72+
```go
73+
em := endpoints.NewManager(client, "foo/bar/my-service")
74+
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
75+
```
76+
77+
### Atomically updating endpoints
78+
79+
If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:
80+
81+
```
82+
em := endpoints.NewManager(c, "foo")
83+
84+
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
85+
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
86+
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
87+
```

clientv3/naming/doc.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,44 +12,47 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
15+
// Package naming provides:
16+
// - subpackage endpoints: an abstraction layer to store and read endpoints
17+
// information from etcd.
18+
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
19+
// services based on the endpoints configuration
1620
//
1721
// To use, first import the packages:
1822
//
1923
// import (
2024
// "go.etcd.io/etcd/clientv3"
21-
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
22-
//
25+
// "go.etcd.io/etcd/clientv3/naming/endpoints"
26+
// "go.etcd.io/etcd/clientv3/naming/resolver"
2327
// "google.golang.org/grpc"
24-
// "google.golang.org/grpc/naming"
2528
// )
2629
//
2730
// First, register new endpoint addresses for a service:
2831
//
2932
// func etcdAdd(c *clientv3.Client, service, addr string) error {
30-
// r := &etcdnaming.GRPCResolver{Client: c}
31-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
33+
// em := endpoints.NewManager(c, service)
34+
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr});
3235
// }
3336
//
3437
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
3538
//
3639
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
37-
// r := &etcdnaming.GRPCResolver{Client: c}
38-
// b := grpc.RoundRobin(r)
39-
// return grpc.Dial(service, grpc.WithBalancer(b))
40+
// etcdResolver, err := resolver.NewBuilder(c);
41+
// if err { return nil, err }
42+
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
4043
// }
4144
//
4245
// Optionally, force delete an endpoint:
4346
//
4447
// func etcdDelete(c *clientv3, service, addr string) error {
45-
// r := &etcdnaming.GRPCResolver{Client: c}
46-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
48+
// em := endpoints.NewManager(c, service)
49+
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
4750
// }
4851
//
4952
// Or register an expiring endpoint with a lease:
5053
//
51-
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
52-
// r := &etcdnaming.GRPCResolver{Client: c}
53-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
54+
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
55+
// em := endpoints.NewManager(c, service)
56+
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid));
5457
// }
5558
package naming
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package endpoints
2+
3+
import (
4+
"context"
5+
6+
"go.etcd.io/etcd/clientv3"
7+
)
8+
9+
// Endpoint represents a single address the connection can be established with.
10+
//
11+
// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
12+
// Please document etcd version since which version each field is supported.
13+
type Endpoint struct {
14+
// Addr is the server address on which a connection will be established.
15+
// Since etcd 3.1
16+
Addr string
17+
18+
// Metadata is the information associated with Addr, which may be used
19+
// to make load balancing decision.
20+
// Since etcd 3.1
21+
Metadata interface{}
22+
}
23+
24+
type Operation uint8
25+
26+
const (
27+
// Add indicates an Endpoint is added.
28+
Add Operation = iota
29+
// Delete indicates an existing address is deleted.
30+
Delete
31+
)
32+
33+
// Update describes a single edit action of an Endpoint.
34+
type Update struct {
35+
// Op - action Add or Delete.
36+
Op Operation
37+
Key string
38+
Endpoint Endpoint
39+
}
40+
41+
// WatchChannel is used to deliver notifications about endpoints updates.
42+
type WatchChannel chan []*Update
43+
44+
// Key2EndpointMap maps etcd key into struct describing the endpoint.
45+
type Key2EndpointMap map[string]Endpoint
46+
47+
// UpdateWithOpts describes endpoint update (add or delete) together
48+
// with etcd options (e.g. to attach an endpoint to a lease).
49+
type UpdateWithOpts struct {
50+
Update
51+
Opts []clientv3.OpOption
52+
}
53+
54+
// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration.
55+
func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts {
56+
return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts}
57+
}
58+
59+
// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion.
60+
func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts {
61+
return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts}
62+
}
63+
64+
// Manager can be used to add/remove & inspect endpoints stored in etcd for
65+
// a particular target.
66+
type Manager interface {
67+
// Update allows to atomically add/remove a few endpoints from etcd.
68+
Update(ctx context.Context, updates []*UpdateWithOpts) error
69+
70+
// AddEndpoint registers a single endpoint in etcd.
71+
// For more advanced use-cases use the Update method.
72+
AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error
73+
// DeleteEndpoint deletes a single endpoint stored in etcd.
74+
// For more advanced use-cases use the Update method.
75+
DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error
76+
77+
// List returns all the endpoints for the current target as a map.
78+
List(ctx context.Context) (Key2EndpointMap, error)
79+
// NewWatchChannel creates a channel that populates or endpoint updates.
80+
// Cancel the 'ctx' to close the watcher.
81+
NewWatchChannel(ctx context.Context) (WatchChannel, error)
82+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package endpoints
2+
3+
// TODO: The API is not yet implemented.
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
"go.etcd.io/etcd/clientv3"
10+
"go.etcd.io/etcd/clientv3/naming/endpoints/internal"
11+
)
12+
13+
type endpointManager struct {
14+
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
15+
}
16+
17+
func NewManager(client *clientv3.Client, target string) (Manager, error) {
18+
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
19+
return nil, fmt.Errorf("Not implemented yet")
20+
}
21+
22+
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
23+
// TODO: For loop in a single transaction:
24+
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
25+
switch internalUpdate.Op {
26+
//case internal.Add:
27+
// var v []byte
28+
// if v, err = json.Marshal(internalUpdate); err != nil {
29+
// return status.Error(codes.InvalidArgument, err.Error())
30+
// }
31+
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
32+
//case internal.Delete:
33+
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
34+
//default:
35+
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
36+
}
37+
return fmt.Errorf("Not implemented yet")
38+
}
39+
40+
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
41+
return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
42+
}
43+
44+
func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
45+
return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
46+
}
47+
48+
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
49+
return nil, fmt.Errorf("Not implemented yet")
50+
51+
// TODO: Implementation to be inspired by:
52+
// Next gets the next set of updates from the etcd resolver.
53+
//// Calls to Next should be serialized; concurrent calls are not safe since
54+
//// there is no way to reconcile the update ordering.
55+
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
56+
// if gw.wch == nil {
57+
// // first Next() returns all addresses
58+
// return gw.firstNext()
59+
// }
60+
// if gw.err != nil {
61+
// return nil, gw.err
62+
// }
63+
//
64+
// // process new events on target/*
65+
// wr, ok := <-gw.wch
66+
// if !ok {
67+
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
68+
// return nil, gw.err
69+
// }
70+
// if gw.err = wr.Err(); gw.err != nil {
71+
// return nil, gw.err
72+
// }
73+
//
74+
// updates := make([]*naming.Update, 0, len(wr.Events))
75+
// for _, e := range wr.Events {
76+
// var jupdate naming.Update
77+
// var err error
78+
// switch e.Type {
79+
// case etcd.EventTypePut:
80+
// err = json.Unmarshal(e.Kv.Value, &jupdate)
81+
// jupdate.Op = naming.Add
82+
// case etcd.EventTypeDelete:
83+
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
84+
// jupdate.Op = naming.Delete
85+
// default:
86+
// continue
87+
// }
88+
// if err == nil {
89+
// updates = append(updates, &jupdate)
90+
// }
91+
// }
92+
// return updates, nil
93+
//}
94+
//
95+
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
96+
// // Use serialized request so resolution still works if the target etcd
97+
// // server is partitioned away from the quorum.
98+
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
99+
// if gw.err = err; err != nil {
100+
// return nil, err
101+
// }
102+
//
103+
// updates := make([]*naming.Update, 0, len(resp.Kvs))
104+
// for _, kv := range resp.Kvs {
105+
// var jupdate naming.Update
106+
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
107+
// continue
108+
// }
109+
// updates = append(updates, &jupdate)
110+
// }
111+
//
112+
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
113+
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
114+
// return updates, nil
115+
//}
116+
}
117+
118+
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
119+
// TODO: Implementation
120+
return nil, fmt.Errorf("Not implemented yet")
121+
}

0 commit comments

Comments
 (0)