Skip to content

Commit 3298840

Browse files
ptaborchaochn47
authored andcommitted
Introduce grpc-1.30+ compatible client/v3/naming API.
This is not yet implementation, just API and tests to be filled with implementation in next CLs, tracked by: #12652 We propose here 3 packages: - clientv3/naming/endpoints -> That is abstraction layer over etcd that allows to write, read & watch Endpoints information. It's independent from GRPC API. It hides the storage details. - clientv3/naming/endpoints/internal -> That contains the grpc's compatible Update class to preserve the internal JSON mashalling format. - clientv3/naming/resolver -> That implements the GRPC resolver API, such that etcd can be used for connection.Dial in grpc. Please see the grpc_naming.md document changes & grpcproxy/cluster.go new integration, to see how the new abstractions work. Signed-off-by: Chao Chen <chaochn@amazon.com>
1 parent 3663ae1 commit 3298840

11 files changed

Lines changed: 587 additions & 34 deletions

File tree

Documentation/dev-guide/grpc_naming.md

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,54 +6,56 @@ etcd provides a gRPC resolver to support an alternative name system that fetches
66

77
## Using etcd discovery with go-grpc
88

9-
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
9+
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:
1010

1111
```go
1212
import (
1313
"go.etcd.io/etcd/clientv3"
14-
etcdnaming "go.etcd.io/etcd/clientv3/naming"
14+
resolver "go.etcd.io/etcd/clientv3/naming/resolver"
1515

1616
"google.golang.org/grpc"
1717
)
1818

19-
...
20-
2119
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
22-
r := &etcdnaming.GRPCResolver{Client: cli}
23-
b := grpc.RoundRobin(r)
24-
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
20+
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
21+
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
2522
```
2623

2724
## Managing service endpoints
2825

29-
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
26+
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
27+
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
28+
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
3029

3130
### Adding an endpoint
3231

3332
New endpoints can be added to the service through `etcdctl`:
3433

3534
```sh
36-
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
35+
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
3736
```
3837

39-
The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
38+
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:
4039

4140
```go
42-
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})
41+
42+
em := endpoints.NewManager(client, "foo/bar/my-service")
43+
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
4344
```
4445

4546
### Deleting an endpoint
4647

4748
Hosts can be deleted from the service through `etcdctl`:
4849

4950
```sh
50-
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
51+
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
5152
```
5253

53-
The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
54+
The etcd client's `endpoints.Manager` method also supports deleting endpoints:
5455

5556
```go
56-
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
57+
em := endpoints.NewManager(client, "foo/bar/my-service")
58+
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
5759
```
5860

5961
### Registering an endpoint with a lease
@@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
6567
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
6668
ETCDCTL_API=3 etcdctl lease keep-alive $lease
6769
```
70+
In the golang:
71+
72+
```go
73+
em := endpoints.NewManager(client, "foo/bar/my-service")
74+
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
75+
```
76+
77+
### Atomically updating endpoints
78+
79+
If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:
80+
81+
```
82+
em := endpoints.NewManager(c, "foo")
83+
84+
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
85+
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
86+
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
87+
```

clientv3/naming/doc.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,44 +12,47 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
15+
// Package naming provides:
16+
// - subpackage endpoints: an abstraction layer to store and read endpoints
17+
// information from etcd.
18+
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
19+
// services based on the endpoints configuration
1620
//
1721
// To use, first import the packages:
1822
//
1923
// import (
2024
// "go.etcd.io/etcd/clientv3"
21-
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
22-
//
25+
// "go.etcd.io/etcd/clientv3/naming/endpoints"
26+
// "go.etcd.io/etcd/clientv3/naming/resolver"
2327
// "google.golang.org/grpc"
24-
// "google.golang.org/grpc/naming"
2528
// )
2629
//
2730
// First, register new endpoint addresses for a service:
2831
//
2932
// func etcdAdd(c *clientv3.Client, service, addr string) error {
30-
// r := &etcdnaming.GRPCResolver{Client: c}
31-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
33+
// em := endpoints.NewManager(c, service)
34+
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr})
3235
// }
3336
//
3437
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
3538
//
3639
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
37-
// r := &etcdnaming.GRPCResolver{Client: c}
38-
// b := grpc.RoundRobin(r)
39-
// return grpc.Dial(service, grpc.WithBalancer(b))
40+
// etcdResolver, err := resolver.NewBuilder(c);
41+
// if err { return nil, err }
42+
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
4043
// }
4144
//
4245
// Optionally, force delete an endpoint:
4346
//
4447
// func etcdDelete(c *clientv3, service, addr string) error {
45-
// r := &etcdnaming.GRPCResolver{Client: c}
46-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
48+
// em := endpoints.NewManager(c, service)
49+
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
4750
// }
4851
//
4952
// Or register an expiring endpoint with a lease:
5053
//
51-
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
52-
// r := &etcdnaming.GRPCResolver{Client: c}
53-
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
54+
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
55+
// em := endpoints.NewManager(c, service)
56+
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid))
5457
// }
5558
package naming
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2023 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package endpoints
16+
17+
import (
18+
"context"
19+
20+
"go.etcd.io/etcd/clientv3"
21+
)
22+
23+
// Endpoint represents a single address the connection can be established with.
24+
//
25+
// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
26+
// Please document etcd version since which version each field is supported.
27+
type Endpoint struct {
28+
// Addr is the server address on which a connection will be established.
29+
// Since etcd 3.1
30+
Addr string
31+
32+
// Metadata is the information associated with Addr, which may be used
33+
// to make load balancing decision.
34+
// Since etcd 3.1
35+
Metadata interface{}
36+
}
37+
38+
type Operation uint8
39+
40+
const (
41+
// Add indicates an Endpoint is added.
42+
Add Operation = iota
43+
// Delete indicates an existing address is deleted.
44+
Delete
45+
)
46+
47+
// Update describes a single edit action of an Endpoint.
48+
type Update struct {
49+
// Op - action Add or Delete.
50+
Op Operation
51+
Key string
52+
Endpoint Endpoint
53+
}
54+
55+
// WatchChannel is used to deliver notifications about endpoints updates.
56+
type WatchChannel chan []*Update
57+
58+
// Key2EndpointMap maps etcd key into struct describing the endpoint.
59+
type Key2EndpointMap map[string]Endpoint
60+
61+
// UpdateWithOpts describes endpoint update (add or delete) together
62+
// with etcd options (e.g. to attach an endpoint to a lease).
63+
type UpdateWithOpts struct {
64+
Update
65+
Opts []clientv3.OpOption
66+
}
67+
68+
// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration.
69+
func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts {
70+
return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts}
71+
}
72+
73+
// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion.
74+
func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts {
75+
return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts}
76+
}
77+
78+
// Manager can be used to add/remove & inspect endpoints stored in etcd for
79+
// a particular target.
80+
type Manager interface {
81+
// Update allows to atomically add/remove a few endpoints from etcd.
82+
Update(ctx context.Context, updates []*UpdateWithOpts) error
83+
84+
// AddEndpoint registers a single endpoint in etcd.
85+
// For more advanced use-cases use the Update method.
86+
AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error
87+
// DeleteEndpoint deletes a single endpoint stored in etcd.
88+
// For more advanced use-cases use the Update method.
89+
DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error
90+
91+
// List returns all the endpoints for the current target as a map.
92+
List(ctx context.Context) (Key2EndpointMap, error)
93+
// NewWatchChannel creates a channel that populates or endpoint updates.
94+
// Cancel the 'ctx' to close the watcher.
95+
NewWatchChannel(ctx context.Context) (WatchChannel, error)
96+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2023 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package endpoints
16+
17+
// TODO: The API is not yet implemented.
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"go.etcd.io/etcd/clientv3"
24+
"go.etcd.io/etcd/clientv3/naming/endpoints/internal"
25+
)
26+
27+
type endpointManager struct {
28+
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
29+
}
30+
31+
func NewManager(client *clientv3.Client, target string) (Manager, error) {
32+
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
33+
return nil, fmt.Errorf("Not implemented yet")
34+
}
35+
36+
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
37+
// TODO: For loop in a single transaction:
38+
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
39+
switch internalUpdate.Op {
40+
//case internal.Add:
41+
// var v []byte
42+
// if v, err = json.Marshal(internalUpdate); err != nil {
43+
// return status.Error(codes.InvalidArgument, err.Error())
44+
// }
45+
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
46+
//case internal.Delete:
47+
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
48+
//default:
49+
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
50+
}
51+
return fmt.Errorf("Not implemented yet")
52+
}
53+
54+
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
55+
return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
56+
}
57+
58+
func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
59+
return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
60+
}
61+
62+
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
63+
return nil, fmt.Errorf("Not implemented yet")
64+
65+
// TODO: Implementation to be inspired by:
66+
// Next gets the next set of updates from the etcd resolver.
67+
//// Calls to Next should be serialized; concurrent calls are not safe since
68+
//// there is no way to reconcile the update ordering.
69+
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
70+
// if gw.wch == nil {
71+
// // first Next() returns all addresses
72+
// return gw.firstNext()
73+
// }
74+
// if gw.err != nil {
75+
// return nil, gw.err
76+
// }
77+
//
78+
// // process new events on target/*
79+
// wr, ok := <-gw.wch
80+
// if !ok {
81+
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
82+
// return nil, gw.err
83+
// }
84+
// if gw.err = wr.Err(); gw.err != nil {
85+
// return nil, gw.err
86+
// }
87+
//
88+
// updates := make([]*naming.Update, 0, len(wr.Events))
89+
// for _, e := range wr.Events {
90+
// var jupdate naming.Update
91+
// var err error
92+
// switch e.Type {
93+
// case etcd.EventTypePut:
94+
// err = json.Unmarshal(e.Kv.Value, &jupdate)
95+
// jupdate.Op = naming.Add
96+
// case etcd.EventTypeDelete:
97+
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
98+
// jupdate.Op = naming.Delete
99+
// default:
100+
// continue
101+
// }
102+
// if err == nil {
103+
// updates = append(updates, &jupdate)
104+
// }
105+
// }
106+
// return updates, nil
107+
//}
108+
//
109+
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
110+
// // Use serialized request so resolution still works if the target etcd
111+
// // server is partitioned away from the quorum.
112+
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
113+
// if gw.err = err; err != nil {
114+
// return nil, err
115+
// }
116+
//
117+
// updates := make([]*naming.Update, 0, len(resp.Kvs))
118+
// for _, kv := range resp.Kvs {
119+
// var jupdate naming.Update
120+
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
121+
// continue
122+
// }
123+
// updates = append(updates, &jupdate)
124+
// }
125+
//
126+
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
127+
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
128+
// return updates, nil
129+
//}
130+
}
131+
132+
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
133+
// TODO: Implementation
134+
return nil, fmt.Errorf("Not implemented yet")
135+
}

0 commit comments

Comments
 (0)