Skip to content

Commit 44afe94

Browse files
authored
client handler sets max clients with next_power_of_two (#323)
* client handler sets max clients with next_power_of_two * set a default panic hook, allowing task cleanup before shutdown
1 parent ce15ce2 commit 44afe94

File tree

4 files changed

+84
-3
lines changed

4 files changed

+84
-3
lines changed

ahnlich/task-manager/src/lib.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,82 @@ impl TaskManager {
168168
pub fn task_count(&self) -> usize {
169169
self.task_tracker.len()
170170
}
171+
172+
/// Installs custom panic hook and allows app cleanup before dying
173+
pub fn install_panic_hook(&self) {
174+
let token = self.cancellation_token();
175+
let default_hook = std::panic::take_hook();
176+
std::panic::set_hook(Box::new(move |hook| {
177+
default_hook(hook);
178+
log::error!("Thread panicked, shutting down all tasks..");
179+
token.cancel();
180+
}));
181+
}
182+
}
183+
184+
#[cfg(test)]
185+
mod tests {
186+
use super::*;
187+
use std::sync::Arc;
188+
use std::sync::atomic::{AtomicBool, Ordering};
189+
190+
struct PanicTask;
191+
192+
#[async_trait::async_trait]
193+
impl Task for PanicTask {
194+
fn task_name(&self) -> String {
195+
"panic-task".to_string()
196+
}
197+
async fn run(&self) -> TaskState {
198+
panic!("simulated papaya panic: assertion failed: len.is_power_of_two()");
199+
}
200+
}
201+
202+
struct WaitingTask {
203+
cleaned_up: Arc<AtomicBool>,
204+
}
205+
206+
#[async_trait::async_trait]
207+
impl Task for WaitingTask {
208+
fn task_name(&self) -> String {
209+
"waiting-task".to_string()
210+
}
211+
async fn run(&self) -> TaskState {
212+
// Yield back to the runtime repeatedly, simulating a long-running task
213+
std::future::pending::<()>().await;
214+
TaskState::Continue
215+
}
216+
async fn cleanup(&self) {
217+
self.cleaned_up.store(true, Ordering::SeqCst);
218+
}
219+
}
220+
221+
#[tokio::test]
222+
async fn test_panic_triggers_shutdown_of_all_tasks() {
223+
let manager = TaskManager::new();
224+
manager.install_panic_hook();
225+
226+
let cleaned_up = Arc::new(AtomicBool::new(false));
227+
228+
// Spawn a long-running task that should get cancelled when the panic fires
229+
manager
230+
.spawn_task_loop(WaitingTask {
231+
cleaned_up: cleaned_up.clone(),
232+
})
233+
.await;
234+
235+
// Spawn a task that will panic
236+
manager.spawn_task_loop(PanicTask).await;
237+
238+
// wait() should return because the panic hook cancels all tasks.
239+
// If the hook doesn't work, this will hang forever — the test runner
240+
// will kill it after its timeout.
241+
manager.wait().await;
242+
243+
// The waiting task should have run its cleanup
244+
assert!(
245+
cleaned_up.load(Ordering::SeqCst),
246+
"WaitingTask cleanup should have been called during shutdown"
247+
);
248+
}
171249
}

ahnlich/utils/src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct CommandLineConfig {
5858
pub log_level: String,
5959

6060
/// Maximum client connections allowed
61-
/// Defaults to 1000
61+
/// Defaults to 512
6262
#[arg(long, default_value_t =
6363
DEFAULT_CONFIG.get_or_init(CommandLineConfig::default).maximum_clients.clone())]
6464
pub maximum_clients: usize,
@@ -107,7 +107,7 @@ impl Default for CommandLineConfig {
107107
enable_tracing: false,
108108
otel_endpoint: None,
109109
log_level: String::from("info,hf_hub=warn"),
110-
maximum_clients: 1000,
110+
maximum_clients: 512,
111111
threadpool_size: 16,
112112
enable_auth: false,
113113
auth_config: None,

ahnlich/utils/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct ClientHandler {
1515
impl ClientHandler {
1616
pub fn new(maximum_clients: usize) -> Self {
1717
Self {
18-
clients: ConcurrentHashSet::with_capacity(maximum_clients),
18+
clients: ConcurrentHashSet::with_capacity(maximum_clients.next_power_of_two()),
1919
maximum_clients,
2020
}
2121
}

ahnlich/utils/src/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ pub trait AhnlichServerUtils: BlockingTask + Sized + Send + Sync + 'static + Deb
8787
parallel::init_threadpool(self.config().threadpool_size);
8888
let task_manager = self.task_manager();
8989

90+
// install panic hook
91+
task_manager.install_panic_hook();
92+
9093
if let Some(persist_location) = self.config().persist_location {
9194
let persistence_task = Persistence::task(
9295
self.write_flag(),

0 commit comments

Comments
 (0)