Skip to content

Commit 736518e

Browse files
committed
Add graceful shutdown to wasmtime serve, fix flaky CI
This commit fixes the spurious CI failure found [here][fail]. The problem here is that the infrastructure for testing `wasmtime serve` was not properly waiting for all output in `finish`. There's some infrastructure in the tests for spawning a subprocess and managing it, but prior to this commit there was no way to shut down the server gracefully and thus read all pending output from the child tasks. The specific problem is that a specific error message was expected in the logs after a request had been processed, but the `finish` method wasn't reading the message. The reason for this is that `finish` had to resort to `kill -9` on the child process as there was no other means of shutting it down. This meant that the print, which happened after request completion, might be killed and never happen. The solution ended up in this commit is to (a) add a `--shutdown-addr` CLI flag to `wasmtime serve` and (b) beef up handling of graceful shutdown. Previously ctrl-c was used to exit the server but it didn't do anything but drop all in-progress work. Instead graceful shutdown is now handled by breaking out of the `accept` loop and then waiting for all child tasks to complete, meaning that no http requests once received are cancelled. In addition to ctrl-c the `--shutdown-addr` is used to listen for a TCP connection which is a second means of cancellation. The reason for this is that sending ctrl-c to a process is not nearly as trivial on Windows as it is on Unix, so I didn't want to deal with all the console bits necessary to get that aligned. [fail]: https://github.com/bytecodealliance/wasmtime/actions/runs/13833291117/job/38702374924?pr=10390
1 parent 9da52ed commit 736518e

File tree

2 files changed

+161
-52
lines changed

2 files changed

+161
-52
lines changed

src/commands/serve.rs

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ use std::{
66
path::PathBuf,
77
sync::{
88
atomic::{AtomicBool, AtomicU64, Ordering},
9-
Arc,
9+
Arc, Mutex,
1010
},
1111
};
12+
use tokio::sync::Notify;
1213
use wasmtime::component::Linker;
1314
use wasmtime::{Engine, Store, StoreLimits};
1415
use wasmtime_wasi::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
@@ -85,12 +86,19 @@ pub struct ServeCommand {
8586
run: RunCommon,
8687

8788
/// Socket address for the web server to bind to.
88-
#[arg(long = "addr", value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
89+
#[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
8990
addr: SocketAddr,
9091

92+
/// Socket address where, when connected to, will initiate a graceful
93+
/// shutdown.
94+
///
95+
/// Note that graceful shutdown is also supported on ctrl-c.
96+
#[arg(long, value_name = "SOCKADDR")]
97+
shutdown_addr: Option<SocketAddr>,
98+
9199
/// Disable log prefixes of wasi-http handlers.
92100
/// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
93-
#[arg(long = "no-logging-prefix")]
101+
#[arg(long)]
94102
no_logging_prefix: bool,
95103

96104
/// The WebAssembly component to run.
@@ -134,17 +142,7 @@ impl ServeCommand {
134142
.enable_io()
135143
.build()?;
136144

137-
runtime.block_on(async move {
138-
tokio::select! {
139-
_ = tokio::signal::ctrl_c() => {
140-
Ok::<_, anyhow::Error>(())
141-
}
142-
143-
res = self.serve() => {
144-
res
145-
}
146-
}
147-
})?;
145+
runtime.block_on(self.serve())?;
148146

149147
Ok(())
150148
}
@@ -371,6 +369,30 @@ impl ServeCommand {
371369
let instance = linker.instantiate_pre(&component)?;
372370
let instance = ProxyPre::new(instance)?;
373371

372+
// Spawn background task(s) waiting for graceful shutdown signals. This
373+
// always listens for ctrl-c but additionally can listen for a TCP
374+
// connection to the specified address.
375+
let shutdown = Arc::new(GracefulShutdown::default());
376+
tokio::task::spawn({
377+
let shutdown = shutdown.clone();
378+
async move {
379+
tokio::signal::ctrl_c().await.unwrap();
380+
shutdown.requested.notify_one();
381+
}
382+
});
383+
if let Some(addr) = self.shutdown_addr {
384+
let listener = tokio::net::TcpListener::bind(addr).await?;
385+
eprintln!(
386+
"Listening for shutdown on tcp://{}/",
387+
listener.local_addr()?
388+
);
389+
let shutdown = shutdown.clone();
390+
tokio::task::spawn(async move {
391+
let _ = listener.accept().await;
392+
shutdown.requested.notify_one();
393+
});
394+
}
395+
374396
let socket = match &self.addr {
375397
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
376398
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
@@ -403,9 +425,16 @@ impl ServeCommand {
403425
let handler = ProxyHandler::new(self, engine, instance);
404426

405427
loop {
406-
let (stream, _) = listener.accept().await?;
428+
// Wait for a socket, but also "race" against shutdown to break out
429+
// of this loop. Once the graceful shutdown signal is received then
430+
// this loop exits immediately.
431+
let (stream, _) = tokio::select! {
432+
_ = shutdown.requested.notified() => break,
433+
v = listener.accept() => v?,
434+
};
407435
let stream = TokioIo::new(stream);
408436
let h = handler.clone();
437+
let shutdown_guard = shutdown.clone().increment();
409438
tokio::task::spawn(async {
410439
if let Err(e) = http1::Builder::new()
411440
.keep_alive(true)
@@ -417,8 +446,75 @@ impl ServeCommand {
417446
{
418447
eprintln!("error: {e:?}");
419448
}
449+
drop(shutdown_guard);
420450
});
421451
}
452+
453+
// Upon exiting the loop we'll no longer process any more incoming
454+
// connections but there may still be outstanding connections
455+
// processing in child tasks. If there are wait for those to complete
456+
// before shutting down completely. Also enable short-circuiting this
457+
// wait with a second ctrl-c signal.
458+
if shutdown.close() {
459+
return Ok(());
460+
}
461+
eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
462+
tokio::select! {
463+
_ = tokio::signal::ctrl_c() => {}
464+
_ = shutdown.complete.notified() => {}
465+
}
466+
467+
Ok(())
468+
}
469+
}
470+
471+
/// Helper structure to manage graceful shutdown int he accept loop above.
472+
#[derive(Default)]
473+
struct GracefulShutdown {
474+
/// Async notification that shutdown has been requested.
475+
requested: Notify,
476+
/// Async notification that shutdown has completed, signaled when
477+
/// `notify_when_done` is `true` and `active_tasks` reaches 0.
478+
complete: Notify,
479+
/// Internal state related to what's in progress when shutdown is requested.
480+
state: Mutex<GracefulShutdownState>,
481+
}
482+
483+
#[derive(Default)]
484+
struct GracefulShutdownState {
485+
active_tasks: u32,
486+
notify_when_done: bool,
487+
}
488+
489+
impl GracefulShutdown {
490+
/// Increments the number of active tasks and returns a guard indicating
491+
fn increment(self: Arc<Self>) -> impl Drop {
492+
struct Guard(Arc<GracefulShutdown>);
493+
494+
let mut state = self.state.lock().unwrap();
495+
assert!(!state.notify_when_done);
496+
state.active_tasks += 1;
497+
drop(state);
498+
499+
return Guard(self);
500+
501+
impl Drop for Guard {
502+
fn drop(&mut self) {
503+
let mut state = self.0.state.lock().unwrap();
504+
state.active_tasks -= 1;
505+
if state.notify_when_done && state.active_tasks == 0 {
506+
self.0.complete.notify_one();
507+
}
508+
}
509+
}
510+
}
511+
512+
/// Flags this state as done spawning tasks and returns whether there are no
513+
/// more child tasks remaining.
514+
fn close(&self) -> bool {
515+
let mut state = self.state.lock().unwrap();
516+
state.notify_when_done = true;
517+
state.active_tasks == 0
422518
}
423519
}
424520

@@ -533,7 +629,7 @@ async fn handle_request(
533629
// which should more clearly tell the user what went wrong. Note
534630
// that we assume the task has already exited at this point so the
535631
// `await` should resolve immediately.
536-
let e = match task.await {
632+
let e = match dbg!(task.await) {
537633
Ok(Ok(())) => {
538634
bail!("guest never invoked `response-outparam::set` method")
539635
}

tests/all/cli_tests.rs

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,6 +1530,7 @@ mod test_programs {
15301530
struct WasmtimeServe {
15311531
child: Option<Child>,
15321532
addr: SocketAddr,
1533+
shutdown_addr: SocketAddr,
15331534
}
15341535

15351536
impl WasmtimeServe {
@@ -1548,61 +1549,73 @@ mod test_programs {
15481549
}
15491550

15501551
fn spawn(cmd: &mut Command) -> Result<WasmtimeServe> {
1552+
cmd.arg("--shutdown-addr=127.0.0.1:0");
15511553
cmd.stdin(Stdio::null());
15521554
cmd.stdout(Stdio::piped());
15531555
cmd.stderr(Stdio::piped());
15541556
let mut child = cmd.spawn()?;
15551557

1556-
// Read the first line of stderr which will say which address it's
1557-
// listening on.
1558-
//
1559-
// NB: this intentionally discards any extra buffered data in the
1560-
// `BufReader` once the newline is found. The server shouldn't print
1561-
// anything interesting other than the address so once we get a line
1562-
// all remaining output is left to be captured by future requests
1563-
// send to the server.
1558+
// Read the first few lines of stderr which will say which address
1559+
// it's listening on. The first line is the shutdown line (with
1560+
// `--shutdown-addr`) and the second is what `--addr` was bound to.
1561+
// This is done to figure out what `:0` was bound to in the child
1562+
// process.
15641563
let mut line = String::new();
15651564
let mut reader = BufReader::new(child.stderr.take().unwrap());
1566-
reader.read_line(&mut line)?;
1567-
1568-
match line.find("127.0.0.1").and_then(|addr_start| {
1569-
let addr = &line[addr_start..];
1570-
let addr_end = addr.find("/")?;
1571-
addr[..addr_end].parse().ok()
1572-
}) {
1573-
Some(addr) => {
1574-
assert!(reader.buffer().is_empty());
1575-
child.stderr = Some(reader.into_inner());
1576-
Ok(WasmtimeServe {
1577-
child: Some(child),
1578-
addr,
1579-
})
1565+
let mut read_addr_from_line = |prefix: &str| -> Result<SocketAddr> {
1566+
reader.read_line(&mut line)?;
1567+
1568+
if !line.starts_with(prefix) {
1569+
bail!("input line `{line}` didn't start with `{prefix}`");
1570+
}
1571+
match line.find("127.0.0.1").and_then(|addr_start| {
1572+
let addr = &line[addr_start..];
1573+
let addr_end = addr.find("/")?;
1574+
addr[..addr_end].parse().ok()
1575+
}) {
1576+
Some(addr) => {
1577+
line.truncate(0);
1578+
Ok(addr)
1579+
}
1580+
None => bail!("failed to address from: {line}"),
15801581
}
1581-
None => {
1582+
};
1583+
let shutdown_addr = read_addr_from_line("Listening for shutdown");
1584+
let addr = read_addr_from_line("Serving HTTP on");
1585+
let (shutdown_addr, addr) = match (shutdown_addr, addr) {
1586+
(Ok(a), Ok(b)) => (a, b),
1587+
// If either failed kill the child and otherwise try to shepherd
1588+
// along any contextual information we have.
1589+
(Err(a), _) | (_, Err(a)) => {
15821590
child.kill()?;
15831591
child.wait()?;
15841592
reader.read_to_string(&mut line)?;
1585-
bail!("failed to start child: {line}")
1593+
return Err(a.context(line));
15861594
}
1587-
}
1595+
};
1596+
assert!(reader.buffer().is_empty());
1597+
child.stderr = Some(reader.into_inner());
1598+
Ok(WasmtimeServe {
1599+
child: Some(child),
1600+
addr,
1601+
shutdown_addr,
1602+
})
15881603
}
15891604

15901605
/// Completes this server gracefully by printing the output on failure.
15911606
fn finish(mut self) -> Result<(String, String)> {
15921607
let mut child = self.child.take().unwrap();
15931608

1594-
// If the child process has already exited then collect the output
1595-
// and test if it succeeded. Otherwise it's still running so kill it
1596-
// and then reap it. Assume that if it's still running then the test
1597-
// has otherwise passed so no need to print the output.
1598-
let known_failure = if child.try_wait()?.is_some() {
1599-
false
1600-
} else {
1601-
child.kill()?;
1602-
true
1603-
};
1609+
// If the child process has already exited, then great! Otherwise
1610+
// the server is still running so send it a graceful ctrl-c shutdown
1611+
// signal. If this platform doesn't support graceful ctrl-c then we
1612+
// can't expect the process to exit successfully.
1613+
if child.try_wait()?.is_none() {
1614+
std::net::TcpStream::connect(&self.shutdown_addr)
1615+
.context("failed to initiate graceful shutdown")?;
1616+
}
16041617
let output = child.wait_with_output()?;
1605-
if !known_failure && !output.status.success() {
1618+
if !output.status.success() {
16061619
bail!("child failed {output:?}");
16071620
}
16081621

0 commit comments

Comments
 (0)