Skip to content

Commit e985d36

Browse files
committed
wip
1 parent bce7f20 commit e985d36

File tree

3 files changed

+107
-27
lines changed

3 files changed

+107
-27
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ tokio = { workspace = true, optional = true, features = [ "signal", "macros" ] }
8181
hyper = { workspace = true, optional = true }
8282
http = { workspace = true, optional = true }
8383
http-body-util = { workspace = true, optional = true }
84+
tikv-jemallocator = "0.6.0"
8485

8586
[target.'cfg(unix)'.dependencies]
8687
rustix = { workspace = true, features = ["mm", "param", "process"] }

src/commands/serve.rs

Lines changed: 85 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use std::{
66
path::PathBuf,
77
sync::{
88
atomic::{AtomicBool, AtomicU64, Ordering},
9-
Arc,
9+
Arc, Mutex,
1010
},
1111
};
1212
use wasmtime::component::Linker;
1313
use wasmtime::{Engine, Store, StoreLimits};
1414
use wasmtime_wasi::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
1515
use wasmtime_wasi_http::bindings::http::types::Scheme;
16-
use wasmtime_wasi_http::bindings::ProxyPre;
16+
use wasmtime_wasi_http::bindings::{Proxy, ProxyPre};
1717
use wasmtime_wasi_http::io::TokioIo;
1818
use wasmtime_wasi_http::{
1919
body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView, DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS,
@@ -27,6 +27,9 @@ use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuild
2727
#[cfg(feature = "wasi-nn")]
2828
use wasmtime_wasi_nn::wit::WasiNnCtx;
2929

30+
#[global_allocator]
31+
static A: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
32+
3033
struct Host {
3134
table: wasmtime::component::ResourceTable,
3235
ctx: WasiCtx,
@@ -96,6 +99,14 @@ pub struct ServeCommand {
9699
/// The WebAssembly component to run.
97100
#[arg(value_name = "WASM", required = true)]
98101
component: PathBuf,
102+
103+
/// TODO
104+
#[arg(long)]
105+
requests_per_instance: Option<u32>,
106+
107+
/// TODO
108+
#[arg(long)]
109+
max_cached_instances: Option<usize>,
99110
}
100111

101112
impl ServeCommand {
@@ -149,27 +160,10 @@ impl ServeCommand {
149160
Ok(())
150161
}
151162

152-
fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
153-
let mut builder = WasiCtxBuilder::new();
154-
self.run.configure_wasip2(&mut builder)?;
155-
156-
builder.env("REQUEST_ID", req_id.to_string());
157-
158-
let stdout_prefix: String;
159-
let stderr_prefix: String;
160-
if self.no_logging_prefix {
161-
stdout_prefix = "".to_string();
162-
stderr_prefix = "".to_string();
163-
} else {
164-
stdout_prefix = format!("stdout [{req_id}] :: ");
165-
stderr_prefix = format!("stderr [{req_id}] :: ");
166-
}
167-
builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
168-
builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
169-
163+
fn new_store(&self, engine: &Engine) -> Result<Store<Host>> {
170164
let mut host = Host {
171165
table: wasmtime::component::ResourceTable::new(),
172-
ctx: builder.build(),
166+
ctx: WasiCtxBuilder::new().build(),
173167
http: WasiHttpCtx::new(),
174168
http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
175169
http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
@@ -465,12 +459,70 @@ struct ProxyHandlerInner {
465459
engine: Engine,
466460
instance_pre: ProxyPre<Host>,
467461
next_id: AtomicU64,
462+
instances: Mutex<Vec<Instance>>,
463+
}
464+
465+
struct Instance {
466+
store: Store<Host>,
467+
handled: u32,
468+
proxy: Proxy,
468469
}
469470

470471
impl ProxyHandlerInner {
471472
fn next_req_id(&self) -> u64 {
472473
self.next_id.fetch_add(1, Ordering::Relaxed)
473474
}
475+
476+
async fn pop_instance(&self, req_id: u64) -> Result<Instance> {
477+
let mut instance = self._pop_instance().await?;
478+
let mut builder = WasiCtxBuilder::new();
479+
self.cmd.run.configure_wasip2(&mut builder)?;
480+
builder.env("REQUEST_ID", req_id.to_string());
481+
482+
let stdout_prefix: String;
483+
let stderr_prefix: String;
484+
if self.cmd.no_logging_prefix {
485+
stdout_prefix = "".to_string();
486+
stderr_prefix = "".to_string();
487+
} else {
488+
stdout_prefix = format!("stdout [{req_id}] :: ");
489+
stderr_prefix = format!("stderr [{req_id}] :: ");
490+
}
491+
builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
492+
builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
493+
494+
instance.store.data_mut().ctx = builder.build();
495+
Ok(instance)
496+
}
497+
498+
async fn _pop_instance(&self) -> Result<Instance> {
499+
{
500+
let mut instances = self.instances.lock().unwrap();
501+
if let Some(instance) = instances.pop() {
502+
return Ok(instance);
503+
}
504+
}
505+
506+
let mut store = self.cmd.new_store(&self.engine)?;
507+
let proxy = self.instance_pre.instantiate_async(&mut store).await?;
508+
Ok(Instance {
509+
store,
510+
handled: 0,
511+
proxy,
512+
})
513+
}
514+
515+
fn push_instance(&self, mut instance: Instance) {
516+
instance.handled += 1;
517+
let requests_per_instance = self.cmd.requests_per_instance.unwrap_or(1 << 20);
518+
let max_cached_instances = self.cmd.max_cached_instances.unwrap_or(1000);
519+
if instance.handled < requests_per_instance {
520+
let mut instances = self.instances.lock().unwrap();
521+
if instances.len() < max_cached_instances {
522+
instances.push(instance);
523+
}
524+
}
525+
}
474526
}
475527

476528
#[derive(Clone)]
@@ -483,6 +535,7 @@ impl ProxyHandler {
483535
engine,
484536
instance_pre,
485537
next_id: AtomicU64::from(0),
538+
instances: Mutex::default(),
486539
}))
487540
}
488541
}
@@ -503,22 +556,27 @@ async fn handle_request(
503556
req.uri()
504557
);
505558

506-
let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
559+
let mut instance = inner.pop_instance(req_id).await?;
507560

508-
let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
509-
let out = store.data_mut().new_response_outparam(sender)?;
510-
let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
561+
let req = instance
562+
.store
563+
.data_mut()
564+
.new_incoming_request(Scheme::Http, req)?;
565+
let out = instance.store.data_mut().new_response_outparam(sender)?;
511566

512567
let task = tokio::task::spawn(async move {
513-
if let Err(e) = proxy
568+
if let Err(e) = instance
569+
.proxy
514570
.wasi_http_incoming_handler()
515-
.call_handle(store, req, out)
571+
.call_handle(&mut instance.store, req, out)
516572
.await
517573
{
518574
log::error!("[{req_id}] :: {:?}", e);
519575
return Err(e);
520576
}
521577

578+
inner.push_instance(instance);
579+
522580
Ok(())
523581
});
524582

0 commit comments

Comments
 (0)