Skip to content

Commit d861129

Browse files
committed
feat: Gracefully shutdown draft!
1 parent cad021a commit d861129

File tree

1 file changed

+60
-18
lines changed

1 file changed

+60
-18
lines changed

src/main.rs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize};
55
use std::env;
66
use std::sync::Arc;
77
use tokio::signal::unix::{signal, SignalKind};
8+
use tokio::sync::mpsc;
89
use tokio::sync::Mutex;
9-
use tracing::info;
10+
use tracing::{error, info};
1011

1112
mod api;
1213
mod error;
@@ -99,31 +100,72 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
99100

100101
info!("Listening on http://{}", addr);
101102

102-
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
103+
let (tx, mut rx) = mpsc::channel(1);
104+
103105
let graceful = server.with_graceful_shutdown(async {
104-
rx.await.ok();
106+
rx.recv().await;
107+
info!("Shutting down hyper server");
105108
});
106109

110+
listen_for_signal(tx.clone(), SignalKind::interrupt());
111+
listen_for_signal(tx.clone(), SignalKind::terminate());
112+
113+
graceful.await?;
114+
115+
let state = state_ptr.lock().await;
116+
for v in state.vms.iter() {
117+
info!("Terminating [{}]", v.name);
118+
119+
if let Err(e) = vm::terminate(&v.name).await {
120+
error!("Error on termination [{}, {}]", v.name, e.to_string());
121+
}
122+
}
123+
124+
// Holds the process open for last machines
125+
for v in state.vms.iter() {
126+
loop {
127+
match get_process(v.pid).await {
128+
Ok(value) => match value {
129+
false => break,
130+
true => {}
131+
},
132+
Err(e) => {
133+
error!("Error on checking process [{}, {}]", v.name, e.to_string());
134+
break;
135+
}
136+
}
137+
}
138+
}
139+
140+
Ok(())
141+
}
142+
143+
async fn get_process(pid: u32) -> Result<bool, std::io::Error> {
144+
match tokio::process::Command::new("ls")
145+
.arg("/proc")
146+
.output()
147+
.await
148+
{
149+
Err(e) => Err(e),
150+
Ok(output) => {
151+
let output = String::from_utf8_lossy(&output.stdout);
152+
let output: Vec<&str> = output.split("\n").collect();
153+
154+
Ok(output.contains(&pid.to_string().as_str()))
155+
}
156+
}
157+
}
158+
159+
fn listen_for_signal(mut tx: tokio::sync::mpsc::Sender<SignalKind>, kind: SignalKind) {
107160
tokio::task::spawn(async move {
108-
let kind = SignalKind::interrupt();
109-
let mut stream = signal(kind).expect("error opening signal stream");
161+
let mut stream = signal(kind).expect(&format!("Error opening signal stream [{:?}]", kind));
110162

111163
loop {
112164
stream.recv().await;
113-
info!("Termination initiated");
165+
info!("Termination signal received");
114166
break;
115167
}
116-
tx.send(()).expect("error sending shutdown signal");
117-
});
118-
119-
graceful.await?;
120168

121-
//TODO: function to clear state
122-
// let vms = state.vms.lock().unwrap();
123-
// for a in vms.iter() {
124-
// println!("{}", a.name);
125-
// vm::terminate(&a.name).await?;
126-
// }
127-
128-
Ok(())
169+
tx.send(kind).await.unwrap();
170+
});
129171
}

0 commit comments

Comments
 (0)