Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,37 +180,40 @@ path = "src/streaming/server.rs"

[dependencies]
async-stream = "0.3"
futures = {version = "0.3", default-features = false, features = ["alloc"]}
futures = { version = "0.3", default-features = false, features = ["alloc"] }
prost = "0.9"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"]}
tokio-stream = {version = "0.1", features = ["net"]}
tonic = {path = "../tonic", features = ["tls", "compression"]}
tower = {version = "0.4"}
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = { path = "../tonic", features = ["tls", "compression"] }
tower = { version = "0.4" }
# Required for routeguide
rand = "0.8"
serde = {version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Tracing
tracing = "0.1.16"
tracing-attributes = "0.1"
tracing-futures = "0.2"
tracing-subscriber = {version = "0.3", features = ["tracing-log"]}
tracing-subscriber = { version = "0.3", features = ["tracing-log"] }
# Required for wellknown types
prost-types = "0.9"
# Hyper example
http = "0.2"
http-body = "0.4.2"
hyper = {version = "0.14", features = ["full"]}
hyper = { version = "0.14", features = ["full"] }
pin-project = "1.0"
warp = "0.3"
# Health example
tonic-health = {path = "../tonic-health"}
tonic-health = { path = "../tonic-health" }
# Reflection example
listenfd = "0.3"
tonic-reflection = {path = "../tonic-reflection"}
tonic-reflection = { path = "../tonic-reflection" }
# grpc-web example
bytes = "1"
tonic-web = {path = "../tonic-web"}
tonic-web = { path = "../tonic-web" }
# streaming example
h2 = "0.3"


[build-dependencies]
tonic-build = {path = "../tonic-build", features = ["prost", "compression"]}
tonic-build = { path = "../tonic-build", features = ["prost", "compression"] }
75 changes: 66 additions & 9 deletions examples/src/streaming/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,85 @@ pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}

use futures::stream::Stream;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::transport::Channel;

use pb::{echo_client::EchoClient, EchoRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
message: format!("msg {:02}", i),
})
}

async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let stream = client
.server_streaming_echo(EchoRequest {
message: "foo".into(),
})
.await
.unwrap()
.into_inner();

// stream is infinite - take just 5 elements and then disconnect
let mut stream = stream.take(num);
while let Some(item) = stream.next().await {
println!("\trecived: {}", item.unwrap().message);
}
// stream is droped here and the disconnect info is send to server
}

async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let in_stream = echo_requests_iter().take(num);

let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();

println!("Connected...now sleeping for 2 seconds...");
let mut resp_stream = response.into_inner();

while let Some(recived) = resp_stream.next().await {
let recived = recived.unwrap();
println!("\trecived message: `{}`", recived.message);
}
}

async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not following why we want this example as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to show difference between graceful stream end the one when client disconnected for example by broken connection or just crush. This 2 functions could be unified to one.

Here is the output from server part.

EchoServer::server_streaming_echo
	client connected from: Some([::1]:36750)
	client disconnected
EchoServer::Bidirectional_streaming_echo
	stream ended
EchoServer::Bidirectional_streaming_echo
	client disconnected: broken pipe
	stream ended

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words the difference is on the server side. On the client side I just needed a function that sends request slowly enough that you can write what is printed. I added additional comments to clarify that.

Should I unify this to bidirectional_streaming_echo(client &mut EchoClient<Channel>, num_reqs: usize, throttle: Duration)?

let in_stream = echo_requests_iter().throttle(dur);

let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();

let mut resp_stream = response.into_inner();

while let Some(recived) = resp_stream.next().await {
let recived = recived.unwrap();
println!("\trecived message: `{}`", recived.message);
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();

tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("Streaming echo:");
streaming_echo(&mut client, 5).await;
tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions

// Disconnect
drop(stream);
drop(client);
// Echo stream that sends 17 requests then gracefull end that conection
println!("\r\nBidirectional stream echo:");
bidirectional_streaming_echo(&mut client, 17).await;

println!("Disconnected...");
// Echo stream that sends up to `usize::MAX` requets. One request each 2s.
// Exiting client with CTRL+C demostrate how to distinguise broken pipe from
//gracefull client disconnection (above example) on the server side.
println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;

Ok(())
}
115 changes: 93 additions & 22 deletions examples/src/streaming/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,39 @@ pub mod pb {
}

use futures::Stream;
use std::net::ToSocketAddrs;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::oneshot;
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};

use pb::{EchoRequest, EchoResponse};

type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;

fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;

loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}

// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}

err = match err.source() {
Some(err) => err,
None => return None,
};
}
}

#[derive(Debug)]
pub struct EchoServer {}

Expand All @@ -29,28 +51,36 @@ impl pb::echo_server::Echo for EchoServer {
&self,
req: Request<EchoRequest>,
) -> EchoResult<Self::ServerStreamingEchoStream> {
println!("Client connected from: {:?}", req.remote_addr());
println!("EchoServer::server_streaming_echo");
println!("\tclient connected from: {:?}", req.remote_addr());

let (tx, rx) = oneshot::channel::<()>();

tokio::spawn(async move {
let _ = rx.await;
println!("The rx resolved therefore the client disconnected!");
// creating infinite stream with requested message
let repeat = std::iter::repeat(EchoResponse {
message: req.into_inner().message,
});
let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));

struct ClientDisconnect(oneshot::Sender<()>);

impl Stream for ClientDisconnect {
type Item = Result<EchoResponse, Status>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// A stream that never resolves to anything....
Poll::Pending
// spawn and channel are required if you want handle "disconnect" functionality
// the `out_stream` will not be polled after client disconnect
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Some(item) = stream.next().await {
match tx.send(Result::<_, Status>::Ok(item)).await {
Ok(_) => {
// item (server response) was queued to be send to client
}
Err(_item) => {
// output_stream was build from rx and both are dropped
break;
}
}
}
}
println!("\tclient disconnected");
});

let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(ClientDisconnect(tx)) as Self::ServerStreamingEchoStream
Box::pin(output_stream) as Self::ServerStreamingEchoStream
))
}

Expand All @@ -65,9 +95,50 @@ impl pb::echo_server::Echo for EchoServer {

async fn bidirectional_streaming_echo(
&self,
_: Request<Streaming<EchoRequest>>,
req: Request<Streaming<EchoRequest>>,
) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
Err(Status::unimplemented("not implemented"))
println!("EchoServer::bidirectional_streaming_echo");

let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);

// this spawn here is required if you want to handle connection error.
// If we just map `in_stream` and write it back as `out_stream` the `out_stream`
// will be drooped when connection error occurs and error will never be propagated
// to mapped version of `in_stream`.
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => tx
.send(Ok(EchoResponse { message: v.message }))
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
Comment thread
xoac marked this conversation as resolved.
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}

match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
println!("\tstream ended");
});

// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);

Ok(Response::new(
Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
))
}
}

Expand Down