Skip to content

Commit b3cd2fe

Browse files
committed
cm-async: Fix leaking thread data into a store on cancel
This commit fixes an issue where when a subtask is cancelled while it's in the `STARTING` state this would leak data within the store. The fix here is to call a dedicated cleanup function which handles all the fields of a thread appropriately.
1 parent 1ac6e1b commit b3cd2fe

File tree

3 files changed

+139
-17
lines changed

3 files changed

+139
-17
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2121,11 +2121,17 @@ impl Instance {
21212121
) -> Result<()> {
21222122
let state = store.concurrent_state_mut();
21232123
let thread_data = state.get_mut(guest_thread.thread)?;
2124-
let guest_id = match thread_data.instance_rep {
2125-
Some(id) => id,
2126-
None => bail_bug!("thread must have instance_rep set by now"),
2127-
};
21282124
let sync_call_set = thread_data.sync_call_set;
2125+
if let Some(guest_id) = thread_data.instance_rep {
2126+
store
2127+
.instance_state(RuntimeInstance {
2128+
instance: self.id().instance(),
2129+
index: runtime_instance,
2130+
})
2131+
.thread_handle_table()
2132+
.guest_thread_remove(guest_id)?;
2133+
}
2134+
let state = store.concurrent_state_mut();
21292135

21302136
// Clean up any pending subtasks in the sync_call_set
21312137
for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
@@ -2137,14 +2143,6 @@ impl Instance {
21372143
}
21382144
}
21392145

2140-
store
2141-
.instance_state(RuntimeInstance {
2142-
instance: self.id().instance(),
2143-
index: runtime_instance,
2144-
})
2145-
.thread_handle_table()
2146-
.guest_thread_remove(guest_id)?;
2147-
21482146
store.concurrent_state_mut().delete(guest_thread.thread)?;
21492147
store.concurrent_state_mut().delete(sync_call_set)?;
21502148
let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
@@ -3672,14 +3670,20 @@ impl Instance {
36723670
task.lower_params = None;
36733671
task.lift_result = None;
36743672
task.exited = true;
3675-
36763673
let instance = task.instance;
36773674

3675+
// Clean up the thread within this task as it's now never going
3676+
// to run.
36783677
assert_eq!(1, task.threads.len());
3679-
let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3680-
let concurrent_state = store.concurrent_state_mut();
3681-
concurrent_state.delete(thread)?;
3682-
assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3678+
let thread = *task.threads.iter().next().unwrap();
3679+
self.cleanup_thread(
3680+
store,
3681+
QualifiedThreadId {
3682+
task: guest_task,
3683+
thread,
3684+
},
3685+
caller_instance,
3686+
)?;
36833687

36843688
// Not yet started; cancel and remove from pending
36853689
let pending = &mut store.instance_state(instance).concurrent_state().pending;

crates/wast/src/wast.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,21 @@ impl WastContext {
244244
caller.gc(None)?;
245245
Ok(())
246246
})?;
247+
#[cfg(feature = "component-model")]
248+
{
249+
let mut i = self.component_linker.instance("wasmtime")?;
250+
i.func_wrap(
251+
"set-max-table-capacity",
252+
|mut store, (capacity,): (u32,)| {
253+
store
254+
.as_context_mut()
255+
.concurrent_resource_table()
256+
.expect("table must be present")
257+
.set_max_capacity(capacity.try_into().unwrap());
258+
Ok(())
259+
},
260+
)?;
261+
}
247262
Ok(())
248263
}
249264

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
;;! component_model_async = true
2+
;;! reference_types = true
3+
4+
;; This exposes a historical bug in wasmtime where when a guest subtask was
5+
;; dropped in the `STARTING` state it leaked resources within the store. Here
6+
;; this is done in a loop N times after setting the store's table capacity much
7+
;; lower than the loop iterations.
8+
9+
(component
10+
(import "wasmtime" (instance $wasmtime
11+
(export "set-max-table-capacity" (func (param "max" u32)))
12+
))
13+
14+
(component $A
15+
(core module $m
16+
(import "" "backpressure.inc" (func $backpressure.inc))
17+
18+
(func (export "set-backpressure") (call $backpressure.inc))
19+
(func (export "hi"))
20+
)
21+
(core func $backpressure.inc (canon backpressure.inc))
22+
(core instance $i (instantiate $m
23+
(with "" (instance
24+
(export "backpressure.inc" (func $backpressure.inc))
25+
))
26+
))
27+
28+
(func (export "set-backpressure") (canon lift (core func $i "set-backpressure")))
29+
(func (export "hi") async (canon lift (core func $i "hi")))
30+
)
31+
(instance $a (instantiate $A))
32+
33+
(component $B
34+
(import "wasmtime" (instance $wasmtime
35+
(export "set-max-table-capacity" (func (param "max" u32)))
36+
))
37+
(import "a" (instance $a
38+
(export "set-backpressure" (func))
39+
(export "hi" (func async))
40+
))
41+
42+
(core func $set-backpressure (canon lower (func $a "set-backpressure")))
43+
(core func $hi (canon lower (func $a "hi") async))
44+
(core func $set-max-table-capacity (canon lower (func $wasmtime "set-max-table-capacity")))
45+
(core func $subtask.cancel (canon subtask.cancel))
46+
(core func $subtask.drop (canon subtask.drop))
47+
48+
(core module $m
49+
(import "" "set-backpressure" (func $set-backpressure))
50+
(import "" "hi" (func $hi (result i32)))
51+
(import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32)))
52+
(import "" "subtask.drop" (func $subtask.drop (param i32)))
53+
(import "" "set-max-table-capacity" (func $set-max-table-capacity (param i32)))
54+
55+
(func (export "run")
56+
(local $rc i32)
57+
(local $task i32)
58+
(local $cnt i32)
59+
call $set-backpressure
60+
61+
(call $set-max-table-capacity (i32.const 100))
62+
63+
(local.set $cnt (i32.const 1000))
64+
65+
loop $l
66+
(local.set $rc (call $hi))
67+
(if (i32.ne (i32.and (local.get $rc) (i32.const 0xf)) (i32.const 0 (; STARTING ;)))
68+
(then unreachable))
69+
(local.set $task (i32.shr_u (local.get $rc) (i32.const 4)))
70+
(local.set $rc (call $subtask.cancel (local.get $task)))
71+
(if (i32.ne (i32.and (local.get $rc) (i32.const 0xf)) (i32.const 3 (; START_CANCELLED ;)))
72+
(then unreachable))
73+
74+
(call $subtask.drop (local.get $task))
75+
76+
(local.set $cnt (i32.sub (local.get $cnt) (i32.const 1)))
77+
(if (local.get $cnt)
78+
(then (br $l)))
79+
end
80+
)
81+
)
82+
83+
(core instance $i (instantiate $m
84+
(with "" (instance
85+
(export "set-backpressure" (func $set-backpressure))
86+
(export "hi" (func $hi))
87+
(export "subtask.cancel" (func $subtask.cancel))
88+
(export "subtask.drop" (func $subtask.drop))
89+
(export "set-max-table-capacity" (func $set-max-table-capacity))
90+
))
91+
))
92+
93+
(func (export "run") async (canon lift (core func $i "run")))
94+
)
95+
96+
(instance $b (instantiate $B
97+
(with "a" (instance $a))
98+
(with "wasmtime" (instance $wasmtime))
99+
))
100+
(export "run" (func $b "run"))
101+
)
102+
103+
(assert_return (invoke "run"))

0 commit comments

Comments
 (0)