Skip to content

Commit 160d978

Browse files
Simplify Rust HTTP client response streaming and limiting (#19510)
*As suggested by @sandhose in #19498 (comment) Simplify Rust HTTP client response streaming and limiting ### Dev notes Synapse's Rust HTTP client was introduced in #18357 ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
1 parent c3af443 commit 160d978

6 files changed

Lines changed: 45 additions & 55 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

changelog.d/19510.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Simplify Rust HTTP client response streaming and limiting.

rust/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pyo3 = { version = "0.27.2", features = [
3535
"anyhow",
3636
"abi3",
3737
"abi3-py310",
38+
# So we can pass `bytes::Bytes` directly back to Python efficiently,
39+
# https://docs.rs/pyo3/latest/pyo3/bytes/index.html
40+
"bytes",
3841
] }
3942
pyo3-log = "0.13.1"
4043
pythonize = "0.27.0"

rust/src/errors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl NotFoundError {
6262
import_exception!(synapse.api.errors, HttpResponseException);
6363

6464
impl HttpResponseException {
65-
pub fn new(status: StatusCode, bytes: Vec<u8>) -> pyo3::PyErr {
65+
pub fn new(status: StatusCode, bytes: bytes::Bytes) -> pyo3::PyErr {
6666
HttpResponseException::new_err((
6767
status.as_u16(),
6868
status.canonical_reason().unwrap_or_default(),

rust/src/http_client.rs

Lines changed: 21 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
use std::{collections::HashMap, future::Future, sync::OnceLock};
1616

1717
use anyhow::Context;
18-
use futures::TryStreamExt;
19-
use headers::HeaderMapExt;
18+
use http_body_util::BodyExt;
2019
use once_cell::sync::OnceCell;
2120
use pyo3::{create_exception, exceptions::PyException, prelude::*};
2221
use reqwest::RequestBuilder;
@@ -236,62 +235,30 @@ impl HttpClient {
236235

237236
let status = response.status();
238237

239-
// Find the expected `Content-Length` so we can pre-allocate the buffer
240-
// necessary to read the response. It's expected that not every request will
241-
// have a `Content-Length` header.
242-
//
243-
// `response.content_length()` does exist but the "value does not directly
244-
// represents the value of the `Content-Length` header, but rather the size
245-
// of the response’s body"
246-
// (https://docs.rs/reqwest/latest/reqwest/struct.Response.html#method.content_length)
247-
// and we want to avoid reading the entire body at this point because we
248-
// purposely stream it below until the `response_limit`.
249-
let content_length = {
250-
let content_length = response
251-
.headers()
252-
.typed_get::<headers::ContentLength>()
253-
// We need a `usize` for the `Vec::with_capacity(...)` usage below
254-
.and_then(|content_length| content_length.0.try_into().ok());
255-
256-
// Sanity check that the request isn't too large from the information
257-
// they told us (may be inaccurate so we also check below as we actually
258-
// read the bytes)
259-
if let Some(content_length_bytes) = content_length {
260-
if content_length_bytes > response_limit {
261-
Err(anyhow::anyhow!(
262-
"Response size (defined by `Content-Length`) too large"
263-
))?;
264-
}
265-
}
266-
267-
content_length
268-
};
269-
270-
// Stream the response to avoid allocating a giant object on the server
271-
// above our expected `response_limit`.
272-
let mut stream = response.bytes_stream();
273-
// Pre-allocate the buffer based on the expected `Content-Length`
274-
let mut buffer = Vec::with_capacity(
275-
content_length
276-
// Default to pre-allocating nothing when the request doesn't have a
277-
// `Content-Length` header
278-
.unwrap_or(0),
279-
);
280-
while let Some(chunk) = stream.try_next().await.context("reading body")? {
281-
if buffer.len() + chunk.len() > response_limit {
282-
Err(anyhow::anyhow!("Response size too large"))?;
283-
}
284-
285-
buffer.extend_from_slice(&chunk);
286-
}
238+
// A light-weight way to read the response up until the `response_limit`. We
239+
// want to avoid allocating a giant response object on the server above our
240+
// expected `response_limit` to avoid out-of-memory DOS problems.
241+
let body = reqwest::Body::from(response);
242+
let limited_body = http_body_util::Limited::new(body, response_limit);
243+
let collected = limited_body
244+
.collect()
245+
.await
246+
.map_err(anyhow::Error::from_boxed)
247+
.with_context(|| {
248+
format!(
249+
"Response body exceeded response limit ({} bytes)",
250+
response_limit
251+
)
252+
})?;
253+
let bytes: bytes::Bytes = collected.to_bytes();
287254

288255
if !status.is_success() {
289-
return Err(HttpResponseException::new(status, buffer));
256+
return Err(HttpResponseException::new(status, bytes));
290257
}
291258

292-
let r = Python::attach(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?;
293-
294-
Ok(r)
259+
// Because of the `pyo3` `bytes` feature, we can pass this back to Python
260+
// land efficiently
261+
Ok(bytes)
295262
})
296263
}
297264
}

tests/synapse_rust/test_http_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,24 @@ async def do_request() -> None:
171171
self.get_success(self.till_deferred_has_result(do_request()))
172172
self.assertEqual(self.server.calls, 1)
173173

174+
def test_request_response_limit_exceeded(self) -> None:
175+
"""
176+
Test to make sure we handle the response limit being exceeded
177+
"""
178+
179+
async def do_request() -> None:
180+
await self._rust_http_client.get(
181+
url=self.server.endpoint,
182+
# Small limit so we hit the limit
183+
response_limit=1,
184+
)
185+
186+
self.assertFailure(
187+
self.till_deferred_has_result(do_request()),
188+
RuntimeError,
189+
)
190+
self.assertEqual(self.server.calls, 1)
191+
174192
async def test_logging_context(self) -> None:
175193
"""
176194
Test to make sure the `LoggingContext` (logcontext) is handled correctly

0 commit comments

Comments
 (0)