Skip to content

Commit a36f439

Browse files
authored
Wait for inflight requests to complete on worker shutdown (#520)
1 parent 66a9acb commit a36f439

4 files changed

Lines changed: 40 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ tls-listener = { version = "=0.11", features = ["rustls-ring"] }
5353
tokio = { version = "1.40", features = ["full"] }
5454
tokio-stream = "0.1"
5555
tokio-tungstenite = "=0.26"
56-
tokio-util = { version = "0.7", features = ["codec"] }
56+
tokio-util = { version = "0.7", features = ["codec", "rt"] }
5757

5858
[target.'cfg(not(any(target_env = "musl", target_os = "freebsd", target_os = "openbsd", target_os = "windows")))'.dependencies]
5959
tikv-jemallocator = { version = "0.6.0", default-features = false, features = ["disable_initial_exec_tls"] }

src/workers.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ macro_rules! serve_mtr {
631631
ret
632632
});
633633
let rth = rt.handler();
634+
let tasks = tokio_util::task::TaskTracker::new();
634635
let mut srx = signal.get().rx.lock().unwrap().take().unwrap();
635636

636637
let main_loop = crate::runtime::run_until_complete(rt, event_loop.clone(), async move {
@@ -642,7 +643,7 @@ macro_rules! serve_mtr {
642643
backpressure,
643644
rth,
644645
callback_wrapper,
645-
tokio::spawn,
646+
|task| tasks.spawn(task),
646647
hyper_util::rt::TokioExecutor::new,
647648
http1_opts,
648649
http2_opts,
@@ -652,6 +653,9 @@ macro_rules! serve_mtr {
652653

653654
log::info!("Stopping worker-{}", worker_id);
654655

656+
tasks.close();
657+
tasks.wait().await;
658+
655659
Python::with_gil(|_| drop(callback_wrapper));
656660
Ok(())
657661
});
@@ -699,6 +703,7 @@ macro_rules! serve_mtr_ssl {
699703
ret
700704
});
701705
let rth = rt.handler();
706+
let tasks = tokio_util::task::TaskTracker::new();
702707
let mut srx = signal.get().rx.lock().unwrap().take().unwrap();
703708

704709
let main_loop = crate::runtime::run_until_complete(rt, event_loop.clone(), async move {
@@ -711,7 +716,7 @@ macro_rules! serve_mtr_ssl {
711716
backpressure,
712717
rth,
713718
callback_wrapper,
714-
tokio::spawn,
719+
|task| tasks.spawn(task),
715720
hyper_util::rt::TokioExecutor::new,
716721
http1_opts,
717722
http2_opts,
@@ -721,6 +726,9 @@ macro_rules! serve_mtr_ssl {
721726

722727
log::info!("Stopping worker-{}", worker_id);
723728

729+
tasks.close();
730+
tasks.wait().await;
731+
724732
Python::with_gil(|_| drop(callback_wrapper));
725733
Ok(())
726734
});
@@ -759,6 +767,7 @@ macro_rules! serve_str_inner {
759767
crate::runtime::init_runtime_st(blocking_threads, py_threads, py_threads_idle_timeout, py_loop);
760768
let rth = rt.handler();
761769
let local = tokio::task::LocalSet::new();
770+
let tasks = tokio_util::task::TaskTracker::new();
762771

763772
crate::runtime::block_on_local(&rt, local, async move {
764773
crate::workers::loop_match!(
@@ -769,7 +778,7 @@ macro_rules! serve_str_inner {
769778
backpressure,
770779
rth,
771780
callback_wrapper,
772-
tokio::task::spawn_local,
781+
|task| tasks.spawn_local(task),
773782
crate::workers::WorkerExecutor::new,
774783
http1_opts,
775784
http2_opts,
@@ -779,6 +788,9 @@ macro_rules! serve_str_inner {
779788

780789
log::info!("Stopping worker-{} runtime-{}", $wid, thread_id + 1);
781790

791+
tasks.close();
792+
tasks.wait().await;
793+
782794
Python::with_gil(|_| drop(callback_wrapper));
783795
});
784796

@@ -852,6 +864,7 @@ macro_rules! serve_str_ssl_inner {
852864
crate::runtime::init_runtime_st(blocking_threads, py_threads, py_threads_idle_timeout, py_loop);
853865
let rth = rt.handler();
854866
let local = tokio::task::LocalSet::new();
867+
let tasks = tokio_util::task::TaskTracker::new();
855868

856869
crate::runtime::block_on_local(&rt, local, async move {
857870
crate::workers::loop_match_tls!(
@@ -863,7 +876,7 @@ macro_rules! serve_str_ssl_inner {
863876
backpressure,
864877
rth,
865878
callback_wrapper,
866-
tokio::task::spawn_local,
879+
|task| tasks.spawn_local(task),
867880
crate::workers::WorkerExecutor::new,
868881
http1_opts,
869882
http2_opts,
@@ -873,6 +886,9 @@ macro_rules! serve_str_ssl_inner {
873886

874887
log::info!("Stopping worker-{} runtime-{}", $wid, thread_id + 1);
875888

889+
tasks.close();
890+
tasks.wait().await;
891+
876892
Python::with_gil(|_| drop(callback_wrapper));
877893
});
878894

src/wsgi/serve.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ impl WSGIWorker {
4343
)
4444
});
4545
let rth = rt.handler();
46+
let tasks = tokio_util::task::TaskTracker::new();
4647

4748
let (stx, mut srx) = tokio::sync::watch::channel(false);
4849
let main_loop = rt.inner.spawn(async move {
@@ -54,7 +55,7 @@ impl WSGIWorker {
5455
backpressure,
5556
rth,
5657
callback_wrapper,
57-
tokio::spawn,
58+
|task| tasks.spawn(task),
5859
hyper_util::rt::TokioExecutor::new,
5960
http1_opts,
6061
http2_opts,
@@ -64,6 +65,9 @@ impl WSGIWorker {
6465

6566
log::info!("Stopping worker-{}", worker_id);
6667

68+
tasks.close();
69+
tasks.wait().await;
70+
6771
Python::with_gil(|_| drop(callback_wrapper));
6872
});
6973

@@ -153,6 +157,7 @@ impl WSGIWorker {
153157
)
154158
});
155159
let rth = rt.handler();
160+
let tasks = tokio_util::task::TaskTracker::new();
156161

157162
let (stx, mut srx) = tokio::sync::watch::channel(false);
158163
rt.inner.spawn(async move {
@@ -165,7 +170,7 @@ impl WSGIWorker {
165170
backpressure,
166171
rth,
167172
callback_wrapper,
168-
tokio::spawn,
173+
|task| tasks.spawn(task),
169174
hyper_util::rt::TokioExecutor::new,
170175
http1_opts,
171176
http2_opts,
@@ -175,6 +180,9 @@ impl WSGIWorker {
175180

176181
log::info!("Stopping worker-{}", worker_id);
177182

183+
tasks.close();
184+
tasks.wait().await;
185+
178186
Python::with_gil(|_| drop(callback_wrapper));
179187
});
180188

0 commit comments

Comments
 (0)