Skip to content

Commit b8f67d1

Browse files
nmoosciscoNathan Moos
andauthored
feat: make channel creation more flexible (#98)
In the interest of enabling caller-managed channels, and in order to reduce feature gate combinatorial explosion, this patch adds a layer of indirection into the etcd client's Channel abstraction. This unifies the existing ways of creating a native Tonic channel, an OpenSSL channel, and future support for caller-supplied channels. The core abstractions introduced here are the `Channel` enum, which can represent multiple ways of passing a channel to the generated clients, and the `MakeBalancedChannel` trait, which allows for the caller to customize the way in which a balanced channel is created. The `Channel` enum contains a `Custom` variant, which stores any Tower service that can process an `http::Request<tonic::body::BoxBody>` and return an `http::Response<tonic::body::BoxBody>`. This allows for maximum customization of the channel from the caller, supporting custom load-balancing policies, custom discovery, and custom transport layers. Co-authored-by: Nathan Moos <nmoos@cisco.com>
1 parent d5a0a68 commit b8f67d1

File tree

3 files changed

+196
-32
lines changed

3 files changed

+196
-32
lines changed

src/channel.rs

Lines changed: 163 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,168 @@
1-
#[cfg(not(feature = "tls-openssl"))]
2-
pub use tonic::transport::Channel;
1+
use std::{future::Future, pin::Pin, task::ready};
32

3+
use http::Uri;
4+
use tokio::sync::mpsc::Sender;
5+
use tonic::transport::Endpoint;
6+
use tower::{discover::Change, util::BoxCloneService, Service};
7+
8+
/// A type alias to make the below types easier to represent.
9+
pub type EndpointUpdater = Sender<Change<Uri, Endpoint>>;
10+
11+
/// Creates a balanced channel.
12+
pub trait BalancedChannelBuilder {
13+
type Error;
14+
15+
/// Makes a new balanced channel, given the provided options.
16+
fn balanced_channel(
17+
self,
18+
buffer_size: usize,
19+
) -> Result<(Channel, EndpointUpdater), Self::Error>;
20+
}
21+
22+
/// Create a simple Tonic channel.
23+
pub struct Tonic;
24+
25+
impl BalancedChannelBuilder for Tonic {
26+
type Error = tonic::transport::Error;
27+
28+
#[inline]
29+
fn balanced_channel(
30+
self,
31+
buffer_size: usize,
32+
) -> Result<(Channel, EndpointUpdater), Self::Error> {
33+
let (chan, tx) = tonic::transport::Channel::balance_channel(buffer_size);
34+
Ok((Channel::Tonic(chan), tx))
35+
}
36+
}
37+
38+
/// Create an Openssl-backed channel.
439
#[cfg(feature = "tls-openssl")]
5-
pub use self::openssl::Channel;
40+
pub struct Openssl {
41+
pub(crate) conn: crate::openssl_tls::OpenSslConnector,
42+
}
643

744
#[cfg(feature = "tls-openssl")]
8-
mod openssl {
9-
use crate::openssl_tls;
10-
11-
/// Because we cannot create `Channel` by the balanced, cached channels,
12-
/// we cannot create clients (which explicitly requires `Channel` as argument) directly.
13-
///
14-
/// This type alias would be useful to 'batch replace' the signature of `Client::new`.
15-
pub type Channel = openssl_tls::OpenSslChannel;
45+
impl BalancedChannelBuilder for Openssl {
46+
type Error = crate::error::Error;
47+
48+
#[inline]
49+
fn balanced_channel(self, _: usize) -> Result<(Channel, EndpointUpdater), Self::Error> {
50+
let (chan, tx) = crate::openssl_tls::balanced_channel(self.conn)?;
51+
Ok((Channel::Openssl(chan), tx))
52+
}
53+
}
54+
55+
type TonicRequest = http::Request<tonic::body::BoxBody>;
56+
type TonicResponse = http::Response<tonic::body::BoxBody>;
57+
pub type CustomChannel = BoxCloneService<TonicRequest, TonicResponse, tower::BoxError>;
58+
59+
/// Represents a channel that can be created by a BalancedChannelBuilder
60+
/// or may be initialized externally and passed into the client.
61+
#[derive(Clone)]
62+
pub enum Channel {
63+
/// A standard tonic channel.
64+
Tonic(tonic::transport::Channel),
65+
66+
/// An OpenSSL channel.
67+
#[cfg(feature = "tls-openssl")]
68+
Openssl(crate::openssl_tls::OpenSslChannel),
69+
70+
/// A custom Service impl, inside a Box.
71+
Custom(CustomChannel),
72+
}
73+
74+
impl std::fmt::Debug for Channel {
75+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76+
f.debug_struct("Channel").finish_non_exhaustive()
77+
}
78+
}
79+
80+
pub enum ChannelFuture {
81+
Tonic(<tonic::transport::Channel as Service<TonicRequest>>::Future),
82+
#[cfg(feature = "tls-openssl")]
83+
Openssl(<crate::openssl_tls::OpenSslChannel as Service<TonicRequest>>::Future),
84+
Custom(<CustomChannel as Service<TonicRequest>>::Future),
85+
}
86+
87+
impl std::future::Future for ChannelFuture {
88+
type Output = Result<TonicResponse, tower::BoxError>;
89+
90+
#[inline]
91+
fn poll(
92+
self: std::pin::Pin<&mut Self>,
93+
cx: &mut std::task::Context<'_>,
94+
) -> std::task::Poll<Self::Output> {
95+
// Safety: trivial projection
96+
unsafe {
97+
let this = self.get_unchecked_mut();
98+
match this {
99+
ChannelFuture::Tonic(fut) => {
100+
let fut = Pin::new_unchecked(fut);
101+
let result = ready!(Future::poll(fut, cx));
102+
result.map_err(|e| Box::new(e) as tower::BoxError).into()
103+
}
104+
#[cfg(feature = "tls-openssl")]
105+
ChannelFuture::Openssl(fut) => {
106+
let fut = Pin::new_unchecked(fut);
107+
Future::poll(fut, cx)
108+
}
109+
ChannelFuture::Custom(fut) => {
110+
let fut = Pin::new_unchecked(fut);
111+
Future::poll(fut, cx)
112+
}
113+
}
114+
}
115+
}
116+
}
117+
118+
impl ChannelFuture {
119+
#[inline]
120+
fn from_tonic(value: <tonic::transport::Channel as Service<TonicRequest>>::Future) -> Self {
121+
Self::Tonic(value)
122+
}
123+
124+
#[cfg(feature = "tls-openssl")]
125+
#[inline]
126+
fn from_openssl(
127+
value: <crate::openssl_tls::OpenSslChannel as Service<TonicRequest>>::Future,
128+
) -> Self {
129+
Self::Openssl(value)
130+
}
131+
132+
#[inline]
133+
fn from_custom(value: <CustomChannel as Service<TonicRequest>>::Future) -> Self {
134+
Self::Custom(value)
135+
}
136+
}
137+
138+
impl Service<TonicRequest> for Channel {
139+
type Response = TonicResponse;
140+
type Error = tower::BoxError;
141+
type Future = ChannelFuture;
142+
143+
#[inline]
144+
fn poll_ready(
145+
&mut self,
146+
cx: &mut std::task::Context<'_>,
147+
) -> std::task::Poll<Result<(), Self::Error>> {
148+
match self {
149+
Channel::Tonic(channel) => {
150+
let result = ready!(channel.poll_ready(cx));
151+
result.map_err(|e| Box::new(e) as tower::BoxError).into()
152+
}
153+
#[cfg(feature = "tls-openssl")]
154+
Channel::Openssl(openssl) => openssl.poll_ready(cx),
155+
Channel::Custom(custom) => custom.poll_ready(cx),
156+
}
157+
}
158+
159+
#[inline]
160+
fn call(&mut self, req: TonicRequest) -> Self::Future {
161+
match self {
162+
Channel::Tonic(channel) => ChannelFuture::from_tonic(channel.call(req)),
163+
#[cfg(feature = "tls-openssl")]
164+
Channel::Openssl(openssl) => ChannelFuture::from_openssl(openssl.call(req)),
165+
Channel::Custom(custom) => ChannelFuture::from_custom(custom.call(req)),
166+
}
167+
}
16168
}

src/client.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
//! Asynchronous client & synchronous client.
22
3-
#[cfg(not(feature = "tls-openssl"))]
4-
use crate::channel::Channel;
53
use crate::error::{Error, Result};
64
use crate::intercept::{InterceptedChannel, Interceptor};
75
use crate::lock::RwLockExt;
86
#[cfg(feature = "tls-openssl")]
9-
use crate::openssl_tls::{self, OpenSslClientConfig, OpenSslConnector};
7+
use crate::openssl_tls::{OpenSslClientConfig, OpenSslConnector};
108
use crate::rpc::auth::Permission;
119
use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
1210
use crate::rpc::auth::{
@@ -77,6 +75,28 @@ impl Client {
7775
endpoints: S,
7876
options: Option<ConnectOptions>,
7977
) -> Result<Self> {
78+
#[cfg(not(feature = "tls-openssl"))]
79+
let make_balanced_channel = crate::channel::Tonic;
80+
#[cfg(feature = "tls-openssl")]
81+
let make_balanced_channel = crate::channel::Openssl {
82+
conn: options
83+
.clone()
84+
.and_then(|o| o.otls)
85+
.unwrap_or_else(OpenSslConnector::create_default)?,
86+
};
87+
Self::connect_with_balanced_channel(endpoints, options, make_balanced_channel).await
88+
}
89+
90+
/// Connect to `etcd` servers from given `endpoints` and a balanced channel.
91+
pub async fn connect_with_balanced_channel<E: AsRef<str>, S: AsRef<[E]>, MBC>(
92+
endpoints: S,
93+
options: Option<ConnectOptions>,
94+
make_balanced_channel: MBC,
95+
) -> Result<Self>
96+
where
97+
MBC: crate::channel::BalancedChannelBuilder,
98+
crate::error::Error: From<MBC::Error>,
99+
{
80100
let endpoints = {
81101
let mut eps = Vec::new();
82102
for e in endpoints.as_ref() {
@@ -91,15 +111,7 @@ impl Client {
91111
}
92112

93113
// Always use balance strategy even if there is only one endpoint.
94-
#[cfg(not(feature = "tls-openssl"))]
95-
let (channel, tx) = Channel::balance_channel(64);
96-
#[cfg(feature = "tls-openssl")]
97-
let (channel, tx) = openssl_tls::balanced_channel(
98-
options
99-
.clone()
100-
.and_then(|o| o.otls)
101-
.unwrap_or_else(OpenSslConnector::create_default)?,
102-
)?;
114+
let (channel, tx) = make_balanced_channel.balanced_channel(64)?;
103115
let channel = InterceptedChannel::new(
104116
channel,
105117
Interceptor {
@@ -122,8 +134,7 @@ impl Client {
122134
}
123135

124136
fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
125-
#[cfg(feature = "tls-openssl")]
126-
use tonic::transport::Channel;
137+
use tonic::transport::Channel as TonicChannel;
127138
let mut endpoint = if url.starts_with(HTTP_PREFIX) {
128139
#[cfg(feature = "tls")]
129140
if let Some(connect_options) = options {
@@ -134,7 +145,7 @@ impl Client {
134145
}
135146
}
136147

137-
Channel::builder(url.parse()?)
148+
TonicChannel::builder(url.parse()?)
138149
} else if url.starts_with(HTTPS_PREFIX) {
139150
#[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
140151
return Err(Error::InvalidArgs(String::from(
@@ -143,7 +154,7 @@ impl Client {
143154

144155
#[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
145156
{
146-
Channel::builder(url.parse()?)
157+
TonicChannel::builder(url.parse()?)
147158
}
148159

149160
#[cfg(feature = "tls")]
@@ -155,7 +166,7 @@ impl Client {
155166
}
156167
.unwrap_or_else(TlsOptions::new);
157168

158-
Channel::builder(url.parse()?).tls_config(tls)?
169+
TonicChannel::builder(url.parse()?).tls_config(tls)?
159170
}
160171
} else {
161172
#[cfg(feature = "tls")]
@@ -169,11 +180,11 @@ impl Client {
169180
match tls {
170181
Some(tls) => {
171182
let e = HTTPS_PREFIX.to_owned() + url;
172-
Channel::builder(e.parse()?).tls_config(tls)?
183+
TonicChannel::builder(e.parse()?).tls_config(tls)?
173184
}
174185
None => {
175186
let e = HTTP_PREFIX.to_owned() + url;
176-
Channel::builder(e.parse()?)
187+
TonicChannel::builder(e.parse()?)
177188
}
178189
}
179190
}
@@ -186,13 +197,13 @@ impl Client {
186197
HTTP_PREFIX
187198
};
188199
let e = pfx.to_owned() + url;
189-
Channel::builder(e.parse()?)
200+
TonicChannel::builder(e.parse()?)
190201
}
191202

192203
#[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
193204
{
194205
let e = HTTP_PREFIX.to_owned() + url;
195-
Channel::builder(e.parse()?)
206+
TonicChannel::builder(e.parse()?)
196207
}
197208
};
198209

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ mod openssl_tls;
6868
mod rpc;
6969
mod vec;
7070

71+
pub use crate::channel::{BalancedChannelBuilder, Channel};
7172
pub use crate::client::{Client, ConnectOptions};
7273
pub use crate::error::Error;
7374
pub use crate::namespace::{KvClientPrefix, LeaseClientPrefix};

0 commit comments

Comments
 (0)