Skip to content

Commit 54dce46

Browse files
committed
Review protocols IO with iter responses
1 parent 0cdba74 commit 54dce46

4 files changed

Lines changed: 60 additions & 65 deletions

File tree

src/asgi/io.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub(crate) struct ASGIHTTPProtocol {
4444
response_started: atomic::AtomicBool,
4545
response_chunked: atomic::AtomicBool,
4646
response_intent: Mutex<Option<(u16, HeaderMap)>>,
47-
body_tx: Mutex<Option<mpsc::Sender<Result<body::Bytes>>>>,
47+
body_tx: Mutex<Option<mpsc::UnboundedSender<body::Bytes>>>,
4848
flow_rx_exhausted: Arc<atomic::AtomicBool>,
4949
flow_rx_closed: Arc<atomic::AtomicBool>,
5050
flow_tx_waiter: Arc<tokio::sync::Notify>,
@@ -89,28 +89,25 @@ impl ASGIHTTPProtocol {
8989
fn send_body<'p>(
9090
&self,
9191
py: Python<'p>,
92-
tx: mpsc::Sender<Result<body::Bytes>>,
92+
tx: &mpsc::UnboundedSender<body::Bytes>,
9393
body: Box<[u8]>,
9494
close: bool,
9595
) -> PyResult<Bound<'p, PyAny>> {
96-
let guard = self.flow_tx_waiter.clone();
97-
let disconnected = self.flow_rx_closed.clone();
98-
future_into_py_futlike(self.rt.clone(), py, async move {
99-
match tx.send(Ok(body.into())).await {
100-
Ok(()) => {
101-
if close {
102-
guard.notify_one();
103-
}
96+
match tx.send(body.into()) {
97+
Ok(()) => {
98+
if close {
99+
self.flow_tx_waiter.notify_one();
104100
}
105-
Err(err) => {
106-
if !disconnected.load(atomic::Ordering::Acquire) {
107-
log::info!("ASGI transport error: {err:?}");
108-
}
109-
guard.notify_one();
101+
}
102+
Err(err) => {
103+
if !self.flow_rx_closed.load(atomic::Ordering::Acquire) {
104+
log::info!("ASGI transport error: {err:?}");
110105
}
106+
self.flow_tx_waiter.notify_one();
111107
}
112-
FutureResultToPy::None
113-
})
108+
}
109+
110+
empty_future_into_py(py)
114111
}
115112

116113
pub fn tx(&self) -> Option<oneshot::Sender<HTTPResponse>> {
@@ -204,7 +201,7 @@ impl ASGIHTTPProtocol {
204201
status,
205202
headers,
206203
http_body_util::Full::new(body::Bytes::from(body))
207-
.map_err(|e| match e {})
204+
.map_err(std::convert::Into::into)
208205
.boxed(),
209206
);
210207
self.flow_tx_waiter.notify_one();
@@ -213,24 +210,23 @@ impl ASGIHTTPProtocol {
213210
(true, true, false) => {
214211
self.response_chunked.store(true, atomic::Ordering::Relaxed);
215212
let (status, headers) = self.response_intent.lock().unwrap().take().unwrap();
216-
let (body_tx, body_rx) = mpsc::channel::<Result<body::Bytes>>(1);
213+
let (body_tx, body_rx) = mpsc::unbounded_channel::<body::Bytes>();
217214
let body_stream = http_body_util::StreamBody::new(
218-
tokio_stream::wrappers::ReceiverStream::new(body_rx).map_ok(body::Frame::data),
215+
tokio_stream::wrappers::UnboundedReceiverStream::new(body_rx)
216+
.map(body::Frame::data)
217+
.map(Result::Ok),
219218
);
220219
*self.body_tx.lock().unwrap() = Some(body_tx.clone());
221220
self.send_response(status, headers, BodyExt::boxed(body_stream));
222-
self.send_body(py, body_tx, body, false)
221+
self.send_body(py, &body_tx, body, false)
223222
}
224223
(true, true, true) => match &*self.body_tx.lock().unwrap() {
225-
Some(tx) => {
226-
let tx = tx.clone();
227-
self.send_body(py, tx, body, false)
228-
}
224+
Some(tx) => self.send_body(py, tx, body, false),
229225
_ => error_flow!(),
230226
},
231227
(true, false, true) => match self.body_tx.lock().unwrap().take() {
232228
Some(tx) => match body.is_empty() {
233-
false => self.send_body(py, tx, body, true),
229+
false => self.send_body(py, &tx, body, true),
234230
true => {
235231
self.flow_tx_waiter.notify_one();
236232
empty_future_into_py(py)

src/rsgi/io.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures::{sink::SinkExt, StreamExt, TryStreamExt};
1+
use futures::{sink::SinkExt, StreamExt};
22
use http_body_util::BodyExt;
33
use hyper::body;
44
use pyo3::{prelude::*, pybacked::PyBackedStr};
@@ -15,47 +15,40 @@ use super::{
1515
};
1616
use crate::{
1717
conversion::FutureResultToPy,
18-
runtime::{empty_future_into_py, future_into_py_futlike, RuntimeRef},
18+
runtime::{empty_future_into_py, err_future_into_py, future_into_py_futlike, RuntimeRef},
1919
ws::{HyperWebsocket, UpgradeData, WSRxStream, WSTxStream},
2020
};
2121

2222
pub(crate) type WebsocketDetachedTransport = (i32, bool, Option<WSTxStream>);
2323

2424
#[pyclass(frozen, module = "granian._granian")]
2525
pub(crate) struct RSGIHTTPStreamTransport {
26-
rt: RuntimeRef,
27-
tx: mpsc::Sender<Result<body::Bytes, anyhow::Error>>,
26+
tx: mpsc::UnboundedSender<body::Bytes>,
2827
}
2928

3029
impl RSGIHTTPStreamTransport {
31-
pub fn new(rt: RuntimeRef, transport: mpsc::Sender<Result<body::Bytes, anyhow::Error>>) -> Self {
32-
Self { rt, tx: transport }
30+
pub fn new(transport: mpsc::UnboundedSender<body::Bytes>) -> Self {
31+
Self { tx: transport }
3332
}
3433
}
3534

35+
// NOTE: the interface doesn't need to be async anymore.
36+
// This would be a breaking change though; probably requires a major version bump in RSGI
3637
#[pymethods]
3738
impl RSGIHTTPStreamTransport {
3839
fn send_bytes<'p>(&self, py: Python<'p>, data: Cow<[u8]>) -> PyResult<Bound<'p, PyAny>> {
39-
let transport = self.tx.clone();
40-
let bdata: Box<[u8]> = data.into();
41-
42-
future_into_py_futlike(self.rt.clone(), py, async move {
43-
match transport.send(Ok(body::Bytes::from(bdata))).await {
44-
Ok(()) => FutureResultToPy::None,
45-
_ => FutureResultToPy::Err(error_stream!()),
46-
}
47-
})
40+
let bdata = body::Bytes::from(std::convert::Into::<Box<[u8]>>::into(data));
41+
match self.tx.send(bdata) {
42+
Ok(()) => empty_future_into_py(py),
43+
_ => err_future_into_py(py, error_stream!()),
44+
}
4845
}
4946

5047
fn send_str<'p>(&self, py: Python<'p>, data: String) -> PyResult<Bound<'p, PyAny>> {
51-
let transport = self.tx.clone();
52-
53-
future_into_py_futlike(self.rt.clone(), py, async move {
54-
match transport.send(Ok(body::Bytes::from(data))).await {
55-
Ok(()) => FutureResultToPy::None,
56-
_ => FutureResultToPy::Err(error_stream!()),
57-
}
58-
})
48+
match self.tx.send(body::Bytes::from(data)) {
49+
Ok(()) => empty_future_into_py(py),
50+
_ => err_future_into_py(py, error_stream!()),
51+
}
5952
}
6053
}
6154

@@ -162,7 +155,11 @@ impl RSGIHTTPProtocol {
162155
#[pyo3(signature = (status=200, headers=vec![], body=vec![].into()))]
163156
fn response_bytes(&self, status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, body: Cow<[u8]>) {
164157
if let Some(tx) = self.tx.lock().unwrap().take() {
165-
_ = tx.send(PyResponse::Body(PyResponseBody::from_bytes(status, headers, body)));
158+
_ = tx.send(PyResponse::Body(PyResponseBody::from_bytes(
159+
status,
160+
headers,
161+
body.into(),
162+
)));
166163
}
167164
}
168165

@@ -188,16 +185,18 @@ impl RSGIHTTPProtocol {
188185
headers: Vec<(PyBackedStr, PyBackedStr)>,
189186
) -> PyResult<Bound<'p, RSGIHTTPStreamTransport>> {
190187
if let Some(tx) = self.tx.lock().unwrap().take() {
191-
let (body_tx, body_rx) = mpsc::channel::<Result<body::Bytes, anyhow::Error>>(1);
188+
let (body_tx, body_rx) = mpsc::unbounded_channel::<body::Bytes>();
192189
let body_stream = http_body_util::StreamBody::new(
193-
tokio_stream::wrappers::ReceiverStream::new(body_rx).map_ok(body::Frame::data),
190+
tokio_stream::wrappers::UnboundedReceiverStream::new(body_rx)
191+
.map(body::Frame::data)
192+
.map(Result::Ok),
194193
);
195194
_ = tx.send(PyResponse::Body(PyResponseBody::new(
196195
status,
197196
headers,
198-
BodyExt::boxed(BodyExt::map_err(body_stream, std::convert::Into::into)),
197+
BodyExt::boxed(body_stream),
199198
)));
200-
let trx = Py::new(py, RSGIHTTPStreamTransport::new(self.rt.clone(), body_tx))?;
199+
let trx = Py::new(py, RSGIHTTPStreamTransport::new(body_tx))?;
201200
return Ok(trx.into_bound(py));
202201
}
203202
error_proto!()

src/rsgi/types.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,12 @@ impl PyResponseBody {
247247
}
248248
}
249249

250-
pub fn from_bytes(status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, body: Cow<[u8]>) -> Self {
251-
let rbody: Box<[u8]> = body.into();
250+
pub fn from_bytes(status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, body: Box<[u8]>) -> Self {
252251
Self {
253252
status: status.try_into().unwrap(),
254253
headers: headers_from_py!(headers),
255-
body: http_body_util::Full::new(Bytes::from(rbody))
256-
.map_err(|e| match e {})
254+
body: http_body_util::Full::new(Bytes::from(body))
255+
.map_err(std::convert::Into::into)
257256
.boxed(),
258257
}
259258
}
@@ -263,7 +262,7 @@ impl PyResponseBody {
263262
status: status.try_into().unwrap(),
264263
headers: headers_from_py!(headers),
265264
body: http_body_util::Full::new(Bytes::from(body))
266-
.map_err(|e| match e {})
265+
.map_err(std::convert::Into::into)
267266
.boxed(),
268267
}
269268
}

src/wsgi/io.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures::TryStreamExt;
1+
use futures::StreamExt;
22
use http_body_util::BodyExt;
33
use hyper::{
44
body,
@@ -8,7 +8,6 @@ use pyo3::{prelude::*, pybacked::PyBackedStr};
88
use std::{borrow::Cow, sync::Mutex};
99
use tokio::sync::{mpsc, oneshot};
1010

11-
use super::utils::py_allow_threads;
1211
use crate::{
1312
http::{HTTPResponseBody, HV_SERVER},
1413
utils::log_application_callable_exception,
@@ -61,12 +60,14 @@ impl WSGIProtocol {
6160

6261
fn response_iter(&self, py: Python, status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, body: Bound<PyAny>) {
6362
if let Some(tx) = self.tx.lock().map_or(None, |mut v| v.take()) {
64-
let (body_tx, body_rx) = mpsc::channel::<Result<body::Bytes, anyhow::Error>>(1);
63+
let (body_tx, body_rx) = mpsc::unbounded_channel::<body::Bytes>();
6564

6665
let body_stream = http_body_util::StreamBody::new(
67-
tokio_stream::wrappers::ReceiverStream::new(body_rx).map_ok(body::Frame::data),
66+
tokio_stream::wrappers::UnboundedReceiverStream::new(body_rx)
67+
.map(body::Frame::data)
68+
.map(Result::Ok),
6869
);
69-
let txbody = BodyExt::boxed(BodyExt::map_err(body_stream, std::convert::Into::into));
70+
let txbody = BodyExt::boxed(body_stream);
7071
let _ = tx.send((status, headers_from_py!(headers), txbody));
7172

7273
let mut closed = false;
@@ -92,7 +93,7 @@ impl WSGIProtocol {
9293
None
9394
}
9495
} {
95-
if py_allow_threads!(py, { body_tx.blocking_send(Ok(frame)) }).is_ok() {
96+
if body_tx.send(frame).is_ok() {
9697
continue;
9798
}
9899
}

0 commit comments

Comments
 (0)