Skip to content

Commit 7ba7635

Browse files
authored
Flow Control for HTTP/1.1 and HTTP/2 (#714)
1 parent decec94 commit 7ba7635

7 files changed

Lines changed: 624 additions & 17 deletions

File tree

awscrt/aio/http.py

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ async def new(cls,
3838
bootstrap: Optional[ClientBootstrap] = None,
3939
socket_options: Optional[SocketOptions] = None,
4040
tls_connection_options: Optional[TlsConnectionOptions] = None,
41-
proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnectionUnified":
41+
proxy_options: Optional[HttpProxyOptions] = None,
42+
manual_window_management: bool = False,
43+
initial_window_size: Optional[int] = None) -> "AIOHttpClientConnectionUnified":
4244
"""
4345
Asynchronously establish a new AIOHttpClientConnectionUnified.
4446
@@ -60,6 +62,23 @@ async def new(cls,
6062
proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
6163
If None is provided then a proxy is not used.
6264
65+
manual_window_management (bool): Set to True to manually manage the flow-control window
66+
of each stream. If False, the connection maintains flow-control windows such that
67+
no back-pressure is applied and data arrives as fast as possible. If True, the
68+
flow-control window of each stream shrinks as body data is received (headers,
69+
padding, and other metadata do not affect the window). `initial_window_size`
70+
determines the starting size of each stream's window. When a stream's window
71+
reaches 0, no further data is received until `update_window()` is called.
72+
For HTTP/2, this only controls stream windows; connection window is controlled
73+
by `conn_manual_window_management`. Default is False.
74+
75+
initial_window_size (Optional[int]): The starting size of each stream's flow-control
76+
window. Required if `manual_window_management` is True, ignored otherwise.
77+
For HTTP/2, this becomes the `INITIAL_WINDOW_SIZE` setting and can be overridden
78+
by `initial_settings`. Must be <= 2^31-1 or connection fails. If set to 0 with
79+
`manual_window_management` True, streams start with zero window.
80+
Required if manual_window_management is True, ignored otherwise.
81+
6382
Returns:
6483
AIOHttpClientConnectionUnified: A new unified HTTP client connection.
6584
"""
@@ -70,7 +89,9 @@ async def new(cls,
7089
socket_options,
7190
tls_connection_options,
7291
proxy_options,
73-
asyncio_connection=True)
92+
asyncio_connection=True,
93+
manual_window_management=manual_window_management,
94+
initial_window_size=initial_window_size)
7495
return await asyncio.wrap_future(future)
7596

7697
async def close(self) -> None:
@@ -118,7 +139,10 @@ async def new(cls,
118139
bootstrap: Optional[ClientBootstrap] = None,
119140
socket_options: Optional[SocketOptions] = None,
120141
tls_connection_options: Optional[TlsConnectionOptions] = None,
121-
proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnection":
142+
proxy_options: Optional[HttpProxyOptions] = None,
143+
manual_window_management: bool = False,
144+
initial_window_size: Optional[int] = None,
145+
read_buffer_capacity: Optional[int] = None) -> "AIOHttpClientConnection":
122146
"""
123147
Asynchronously establish a new AIOHttpClientConnection.
124148
@@ -140,6 +164,18 @@ async def new(cls,
140164
proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
141165
If None is provided then a proxy is not used.
142166
167+
manual_window_management (bool): If True, enables manual flow control window management.
168+
Default is False.
169+
170+
initial_window_size (Optional[int]): Initial window size for flow control.
171+
Required if manual_window_management is True, ignored otherwise.
172+
173+
read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's
174+
read buffer. The buffer grows when the flow-control window of the incoming stream
175+
reaches zero. Ignored if `manual_window_management` is False. A capacity that is
176+
too small may hinder throughput. A capacity that is too large may waste memory
177+
without improving throughput. If None or zero, a default value is used.
178+
143179
Returns:
144180
AIOHttpClientConnection: A new HTTP client connection.
145181
"""
@@ -151,7 +187,10 @@ async def new(cls,
151187
tls_connection_options,
152188
proxy_options,
153189
expected_version=HttpVersion.Http1_1,
154-
asyncio_connection=True)
190+
asyncio_connection=True,
191+
manual_window_management=manual_window_management,
192+
initial_window_size=initial_window_size,
193+
read_buffer_capacity=read_buffer_capacity)
155194
return await asyncio.wrap_future(future)
156195

157196
def request(self,
@@ -189,8 +228,12 @@ async def new(cls,
189228
tls_connection_options: Optional[TlsConnectionOptions] = None,
190229
proxy_options: Optional[HttpProxyOptions] = None,
191230
initial_settings: Optional[List[Http2Setting]] = None,
192-
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]],
193-
None]] = None) -> "AIOHttp2ClientConnection":
231+
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None,
232+
manual_window_management: bool = False,
233+
initial_window_size: Optional[int] = None,
234+
conn_manual_window_management: bool = False,
235+
conn_window_size_threshold: Optional[int] = None,
236+
stream_window_size_threshold: Optional[int] = None) -> "AIOHttp2ClientConnection":
194237
"""
195238
Asynchronously establish an HTTP/2 client connection.
196239
Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions
@@ -205,6 +248,30 @@ async def new(cls,
205248
The function should take the following arguments and return nothing:
206249
207250
* `settings` (List[Http2Setting]): List of settings that were changed.
251+
252+
manual_window_management (bool): If True, enables manual flow control window management.
253+
Default is False.
254+
255+
initial_window_size (Optional[int]): Initial window size for flow control.
256+
Required if manual_window_management is True, ignored otherwise.
257+
258+
conn_manual_window_management (bool): If True, enables manual connection-level flow control
259+
for the entire HTTP/2 connection. When enabled, the connection's flow-control window
260+
shrinks as body data is received across all streams. The initial connection window is
261+
65,535 bytes. When the window reaches 0, all streams stop receiving data until
262+
`update_window()` is called to increment the connection's window.
263+
Note: Padding in DATA frames counts against the window, but window updates for padding
264+
are sent automatically even in manual mode. Default is False.
265+
266+
conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE
267+
frames. Ignored if `conn_manual_window_management` is False. When the connection's window
268+
is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update
269+
is sent. Default is 32,767 (half of the initial 65,535 window).
270+
271+
stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE
272+
frames. Ignored if `manual_window_management` is False. When a stream's window is above
273+
this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent.
274+
Default is half of `initial_window_size`.
208275
"""
209276
future = cls._generic_new(
210277
host_name,
@@ -216,7 +283,12 @@ async def new(cls,
216283
expected_version=HttpVersion.Http2,
217284
initial_settings=initial_settings,
218285
on_remote_settings_changed=on_remote_settings_changed,
219-
asyncio_connection=True)
286+
asyncio_connection=True,
287+
manual_window_management=manual_window_management,
288+
initial_window_size=initial_window_size,
289+
conn_manual_window_management=conn_manual_window_management,
290+
conn_window_size_threshold=conn_window_size_threshold,
291+
stream_window_size_threshold=stream_window_size_threshold)
220292
return await asyncio.wrap_future(future)
221293

222294
def request(self,
@@ -237,6 +309,15 @@ def request(self,
237309
"""
238310
return AIOHttp2ClientStream(self, request, request_body_generator, loop)
239311

312+
def update_window(self, increment_size: int) -> None:
313+
"""
314+
Update the connection's flow control window.
315+
316+
Args:
317+
increment_size (int): Number of bytes to increment the window by.
318+
"""
319+
_awscrt.http2_connection_update_window(self._binding, increment_size)
320+
240321

241322
class AIOHttpClientStreamUnified(HttpClientStreamBase):
242323
__slots__ = (

awscrt/http.py

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,13 @@ def _generic_new(
131131
expected_version: Optional[HttpVersion] = None,
132132
initial_settings: Optional[List[Http2Setting]] = None,
133133
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None,
134-
asyncio_connection=False) -> "concurrent.futures.Future":
134+
asyncio_connection=False,
135+
manual_window_management: bool = False,
136+
initial_window_size: Optional[int] = None,
137+
read_buffer_capacity: Optional[int] = None,
138+
conn_manual_window_management: bool = False,
139+
conn_window_size_threshold: Optional[int] = None,
140+
stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future":
135141
"""
136142
Initialize the generic part of the HttpClientConnection class.
137143
"""
@@ -141,6 +147,7 @@ def _generic_new(
141147
assert isinstance(tls_connection_options, TlsConnectionOptions) or tls_connection_options is None
142148
assert isinstance(socket_options, SocketOptions) or socket_options is None
143149
assert isinstance(proxy_options, HttpProxyOptions) or proxy_options is None
150+
assert not manual_window_management or initial_window_size is not None
144151

145152
future = Future()
146153

@@ -170,7 +177,13 @@ def _generic_new(
170177
proxy_options,
171178
initial_settings,
172179
on_remote_settings_changed,
173-
connection_core)
180+
connection_core,
181+
manual_window_management,
182+
initial_window_size,
183+
read_buffer_capacity,
184+
conn_manual_window_management,
185+
conn_window_size_threshold,
186+
stream_window_size_threshold)
174187

175188
except Exception as e:
176189
future.set_exception(e)
@@ -203,7 +216,10 @@ def new(cls,
203216
bootstrap: Optional[ClientBootstrap] = None,
204217
socket_options: Optional[SocketOptions] = None,
205218
tls_connection_options: Optional[TlsConnectionOptions] = None,
206-
proxy_options: Optional['HttpProxyOptions'] = None) -> "concurrent.futures.Future":
219+
proxy_options: Optional['HttpProxyOptions'] = None,
220+
manual_window_management: bool = False,
221+
initial_window_size: Optional[int] = None,
222+
read_buffer_capacity: Optional[int] = None) -> "concurrent.futures.Future":
207223
"""
208224
Asynchronously establish a new HttpClientConnection.
209225
@@ -225,6 +241,27 @@ def new(cls,
225241
proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
226242
If None is provided then a proxy is not used.
227243
244+
manual_window_management (bool): Set to True to manually manage the flow-control window
245+
of each stream. If False, the connection maintains flow-control windows such that
246+
no back-pressure is applied and data arrives as fast as possible. If True, the
247+
flow-control window of each stream shrinks as body data is received (headers,
248+
padding, and other metadata do not affect the window). `initial_window_size`
249+
determines the starting size of each stream's window. When a stream's window
250+
reaches 0, no further data is received until `update_window()` is called.
251+
Default is False.
252+
253+
initial_window_size (Optional[int]): The starting size of each stream's flow-control
254+
window. Required if `manual_window_management` is True, ignored otherwise.
255+
Must be <= 2^31-1 or connection fails. If set to 0 with `manual_window_management`
256+
True, streams start with zero window.
257+
Required if manual_window_management is True, ignored otherwise.
258+
259+
read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's
260+
read buffer. The buffer grows when the flow-control window of the incoming stream
261+
reaches zero. Ignored if `manual_window_management` is False. A capacity that is
262+
too small may hinder throughput. A capacity that is too large may waste memory
263+
without improving throughput. If None or zero, a default value is used.
264+
228265
Returns:
229266
concurrent.futures.Future: A Future which completes when connection succeeds or fails.
230267
If successful, the Future will contain a new :class:`HttpClientConnection`.
@@ -236,7 +273,10 @@ def new(cls,
236273
bootstrap,
237274
socket_options,
238275
tls_connection_options,
239-
proxy_options)
276+
proxy_options,
277+
manual_window_management=manual_window_management,
278+
initial_window_size=initial_window_size,
279+
read_buffer_capacity=read_buffer_capacity)
240280

241281
def request(self,
242282
request: 'HttpRequest',
@@ -311,7 +351,12 @@ def new(cls,
311351
proxy_options: Optional['HttpProxyOptions'] = None,
312352
initial_settings: Optional[List[Http2Setting]] = None,
313353
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]],
314-
None]] = None) -> "concurrent.futures.Future":
354+
None]] = None,
355+
manual_window_management: bool = False,
356+
initial_window_size: Optional[int] = None,
357+
conn_manual_window_management: bool = False,
358+
conn_window_size_threshold: Optional[int] = None,
359+
stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future":
315360
"""
316361
Asynchronously establish an HTTP/2 client connection.
317362
Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions
@@ -326,6 +371,30 @@ def new(cls,
326371
The function should take the following arguments and return nothing:
327372
328373
* `settings` (List[Http2Setting]): List of settings that were changed.
374+
375+
manual_window_management (bool): If True, enables manual flow control window management.
376+
Default is False.
377+
378+
initial_window_size (Optional[int]): Initial window size for flow control.
379+
Required if manual_window_management is True, ignored otherwise.
380+
381+
conn_manual_window_management (bool): If True, enables manual connection-level flow control
382+
for the entire HTTP/2 connection. When enabled, the connection's flow-control window
383+
shrinks as body data is received across all streams. The initial connection window is
384+
65,535 bytes. When the window reaches 0, all streams stop receiving data until
385+
`update_window()` is called to increment the connection's window.
386+
Note: Padding in DATA frames counts against the window, but window updates for padding
387+
are sent automatically even in manual mode. Default is False.
388+
389+
conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE
390+
frames. Ignored if `conn_manual_window_management` is False. When the connection's window
391+
is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update
392+
is sent. Default is 32,767 (half of the initial 65,535 window).
393+
394+
stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE
395+
frames. Ignored if `manual_window_management` is False. When a stream's window is above
396+
this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent.
397+
Default is half of `initial_window_size`.
329398
"""
330399
return cls._generic_new(
331400
host_name,
@@ -336,7 +405,12 @@ def new(cls,
336405
proxy_options,
337406
HttpVersion.Http2,
338407
initial_settings,
339-
on_remote_settings_changed)
408+
on_remote_settings_changed,
409+
manual_window_management=manual_window_management,
410+
initial_window_size=initial_window_size,
411+
conn_manual_window_management=conn_manual_window_management,
412+
conn_window_size_threshold=conn_window_size_threshold,
413+
stream_window_size_threshold=stream_window_size_threshold)
340414

341415
def request(self,
342416
request: 'HttpRequest',
@@ -397,6 +471,15 @@ def close(self) -> "concurrent.futures.Future":
397471
_awscrt.http_connection_close(self._binding)
398472
return self.shutdown_future
399473

474+
def update_window(self, increment_size: int) -> None:
475+
"""
476+
Update the connection's flow control window.
477+
478+
Args:
479+
increment_size (int): Number of bytes to increment the window by.
480+
"""
481+
_awscrt.http2_connection_update_window(self._binding, increment_size)
482+
400483

401484
class HttpStreamBase(NativeResource):
402485
"""Base for HTTP stream classes.
@@ -486,6 +569,15 @@ def _on_complete(self, error_code: int) -> None:
486569
else:
487570
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))
488571

572+
def update_window(self, increment_size: int) -> None:
573+
"""
574+
Update the stream's flow control window.
575+
576+
Args:
577+
increment_size (int): Number of bytes to increment the window by.
578+
"""
579+
_awscrt.http_stream_update_window(self, increment_size)
580+
489581

490582
class HttpClientStream(HttpClientStreamBase):
491583
"""HTTP stream that sends a request and receives a response.

source/http.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ PyObject *aws_py_http_connection_close(PyObject *self, PyObject *args);
3030
*/
3131
PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args);
3232

33+
/**
34+
* Update HTTP/2 connection window size.
35+
*/
36+
PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args);
37+
38+
/**
39+
* Update HTTP stream window size.
40+
*/
41+
PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args);
42+
3343
/**
3444
* Create a new connection. returns void. The on_setup callback will be invoked
3545
* upon either success or failure of the connection.

0 commit comments

Comments
 (0)