Skip to content

Commit f6904e6

Browse files
committed
feat: enable concurrent HTTP requests with improved async coroutine interpreter and new example.
1 parent bc3fb8e commit f6904e6

File tree

9 files changed

+117
-32
lines changed

9 files changed

+117
-32
lines changed

docs/index.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ <h1>Chen Lang <span>Playground</span></h1>
3737
<option value="closures">Closures</option>
3838
<option value="async_task">Coroutines</option>
3939
<option value="async_http">Async HTTP Request</option>
40+
<option value="concurrent_http">Concurrent HTTP</option>
4041
</select>
4142
</div>
4243
<button id="run">

docs/index.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,43 @@ let resp = http.request("GET", url)
327327
println("Status: " + resp.status)
328328
let data = json.parse(resp.body)
329329
println("Response JSON origin: " + data.origin)
330+
`,
331+
concurrent_http: `# Feature: Concurrent HTTP Requests
332+
let http = import "stdlib/http"
333+
let json = import "stdlib/json"
334+
let println = import "stdlib/io".println
335+
336+
println("Starting concurrent HTTP requests...")
337+
338+
# Helper function to fetch URL and return status
339+
def fetch_status(url) {
340+
let resp = http.request("GET", url)
341+
return resp.status
342+
}
343+
344+
# Create coroutines for parallel requests
345+
let co1 = coroutine.create(def() { fetch_status("https://httpbin.org/delay/1") })
346+
let co2 = coroutine.create(def() { fetch_status("https://httpbin.org/delay/1") })
347+
let co3 = coroutine.create(def() {
348+
let resp = http.request("GET", "https://httpbin.org/uuid")
349+
let data = json.parse(resp.body)
350+
return data.uuid
351+
})
352+
353+
# Spawn all coroutines (non-blocking)
354+
coroutine.spawn(co1)
355+
coroutine.spawn(co2)
356+
coroutine.spawn(co3)
357+
358+
println("All requests started, waiting for completion...")
359+
360+
# Wait for all to complete
361+
let results = coroutine.await_all([co1, co2, co3])
362+
363+
println("All requests completed!")
364+
println("Request 1 status: " + results[0])
365+
println("Request 2 status: " + results[1])
366+
println("Request 3 UUID: " + results[2])
330367
`
331368
};
332369

docs/pkg/chen_lang.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ export interface InitOutput {
99
readonly memory: WebAssembly.Memory;
1010
readonly run: (a: number, b: number) => void;
1111
readonly run_wasm: (a: number, b: number) => any;
12-
readonly wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______: (a: number, b: number) => void;
13-
readonly wasm_bindgen_47816a2d43ec07ce___closure__destroy___dyn_core_3cd24c61823388c3___ops__function__FnMut_____Output_______: (a: number, b: number) => void;
1412
readonly wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue_____: (a: number, b: number, c: any) => void;
1513
readonly wasm_bindgen_47816a2d43ec07ce___closure__destroy___dyn_core_3cd24c61823388c3___ops__function__FnMut__wasm_bindgen_47816a2d43ec07ce___JsValue____Output_______: (a: number, b: number) => void;
14+
readonly wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______: (a: number, b: number) => void;
15+
readonly wasm_bindgen_47816a2d43ec07ce___closure__destroy___dyn_core_3cd24c61823388c3___ops__function__FnMut_____Output_______: (a: number, b: number) => void;
1616
readonly wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue__wasm_bindgen_47816a2d43ec07ce___JsValue_____: (a: number, b: number, c: any, d: any) => void;
1717
readonly __wbindgen_malloc: (a: number, b: number) => number;
1818
readonly __wbindgen_realloc: (a: number, b: number, c: number, d: number) => number;

docs/pkg/chen_lang.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,14 @@ if (!('encodeInto' in cachedTextEncoder)) {
208208

209209
let WASM_VECTOR_LEN = 0;
210210

211-
function wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______(arg0, arg1) {
212-
wasm.wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______(arg0, arg1);
213-
}
214-
215211
function wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue_____(arg0, arg1, arg2) {
216212
wasm.wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue_____(arg0, arg1, arg2);
217213
}
218214

215+
function wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______(arg0, arg1) {
216+
wasm.wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______(arg0, arg1);
217+
}
218+
219219
function wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue__wasm_bindgen_47816a2d43ec07ce___JsValue_____(arg0, arg1, arg2, arg3) {
220220
wasm.wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue__wasm_bindgen_47816a2d43ec07ce___JsValue_____(arg0, arg1, arg2, arg3);
221221
}

docs/pkg/chen_lang_bg.wasm

-417 Bytes
Binary file not shown.

docs/pkg/chen_lang_bg.wasm.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
export const memory: WebAssembly.Memory;
44
export const run: (a: number, b: number) => void;
55
export const run_wasm: (a: number, b: number) => any;
6-
export const wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______: (a: number, b: number) => void;
7-
export const wasm_bindgen_47816a2d43ec07ce___closure__destroy___dyn_core_3cd24c61823388c3___ops__function__FnMut_____Output_______: (a: number, b: number) => void;
86
export const wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue_____: (a: number, b: number, c: any) => void;
97
export const wasm_bindgen_47816a2d43ec07ce___closure__destroy___dyn_core_3cd24c61823388c3___ops__function__FnMut__wasm_bindgen_47816a2d43ec07ce___JsValue____Output_______: (a: number, b: number) => void;
8+
export const wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke______: (a: number, b: number) => void;
9+
export const wasm_bindgen_47816a2d43ec07ce___closure__destroy___dyn_core_3cd24c61823388c3___ops__function__FnMut_____Output_______: (a: number, b: number) => void;
1010
export const wasm_bindgen_47816a2d43ec07ce___convert__closures_____invoke___wasm_bindgen_47816a2d43ec07ce___JsValue__wasm_bindgen_47816a2d43ec07ce___JsValue_____: (a: number, b: number, c: any, d: any) => void;
1111
export const __wbindgen_malloc: (a: number, b: number) => number;
1212
export const __wbindgen_realloc: (a: number, b: number, c: number, d: number) => number;

src/tests/async_timer_test.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,6 @@ fn run_code(code: &str) -> String {
1616
fn test_timer_sleep() {
1717
// Tests that we can sleep for a duration
1818
// And that the VM waits for it.
19-
let code = r#"
20-
let timer = import "stdlib/timer"
21-
let start = stdlib.date.now()
22-
timer.sleep(200)
23-
let end = stdlib.date.now()
24-
25-
# Check if time passed is at least 200ms
26-
# Using a small margin for error (Rust test env might be slow or fast, but delta should be positive)
27-
if (end - start) >= 200 {
28-
return "OK"
29-
} else {
30-
return "Too fast: " + (end - start)
31-
}
32-
"#;
3319

3420
// NOTE: stdlib.date.now() returns milliseconds.
3521
// importing stdlib/date implicitly? No, `stdlib.date` is not standard.
@@ -70,3 +56,42 @@ fn test_async_interleaving() {
7056
"#;
7157
assert_eq!(run_code(code), "Done");
7258
}
59+
60+
#[test]
61+
fn test_spawn_closure_with_sleep() {
62+
let code = r#"
63+
let timer = import "stdlib/timer"
64+
let co = coroutine.create(def() {
65+
# 匿名函数直接调用 native async,这会触发 Yield
66+
timer.sleep(50)
67+
return "WakeUp"
68+
})
69+
70+
coroutine.spawn(co)
71+
let results = coroutine.await_all([co])
72+
73+
return results[0]
74+
"#;
75+
76+
assert_eq!(run_code(code), "WakeUp");
77+
}
78+
79+
#[test]
80+
fn test_spawn_closure_captures_and_sleep() {
81+
let code = r#"
82+
let timer = import "stdlib/timer"
83+
let msg = "Capturing"
84+
85+
let co = coroutine.create(def() {
86+
timer.sleep(10)
87+
return msg + " Works"
88+
})
89+
90+
coroutine.spawn(co)
91+
let results = coroutine.await_all([co])
92+
93+
return results[0]
94+
"#;
95+
96+
assert_eq!(run_code(code), "Capturing Works");
97+
}

src/vm/interpreter.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,38 @@ impl VM {
6464
last_res = self.execute_from(self.pc);
6565

6666
// 4. Check fiber completion and save result
67-
{
67+
// Only if execution finished successfully (not yielded)
68+
if let Ok(ref result) = last_res {
6869
let mut f = fiber.borrow_mut();
69-
if f.state == FiberState::Dead || f.call_stack.is_empty() {
70-
if let Ok(ref result) = last_res {
71-
f.result = Some(result.clone());
72-
}
70+
71+
// We consider the fiber finished if:
72+
// 1. It is explicitly marked Dead (by Return instruction)
73+
// 2. It is still Running but call stack is empty (ran off end of script)
74+
// IMPORTANT: If it is Suspended, it yielded (e.g. async I/O), so we must NOT mark it dead.
75+
let is_finished =
76+
f.state == FiberState::Dead || (f.state == FiberState::Running && f.call_stack.is_empty());
77+
78+
if is_finished {
79+
f.result = Some(result.clone());
7380
f.state = FiberState::Dead;
7481
if f.is_spawned {
75-
*self.async_state.pending_tasks.borrow_mut() -= 1;
82+
let mut pt = self.async_state.pending_tasks.borrow_mut();
83+
// println!("DEBUG: Fiber finished. Decrementing pending: {} -> {}", *pt, *pt - 1);
84+
*pt -= 1;
7685
}
7786
self.async_state.notify.notify_waiters();
7887
}
7988
}
8089

8190
// 5. Check if we need to propagate error
82-
if let Err(_) = last_res {
83-
return last_res;
91+
if let Err(e) = &last_res {
92+
// If it's just a Yield, we don't propagate it as a VM error
93+
// The fiber is already suspended.
94+
if matches!(e.error, VMRuntimeError::Yield) {
95+
// Continue loop
96+
} else {
97+
return last_res;
98+
}
8499
}
85100
}
86101
}
@@ -1061,8 +1076,11 @@ impl VM {
10611076
.checked_sub(*arg_count)
10621077
.ok_or(VMRuntimeError::StackUnderflow("CallStack native: missing args".into()))?;
10631078
let args: Vec<Value> = self.stack.drain(start_index..).collect();
1064-
let result = native_fn(self, args)?;
1065-
self.stack.push(result);
1079+
1080+
let result = native_fn(self, args);
1081+
let val = result?;
1082+
1083+
self.stack.push(val);
10661084
Ok(true)
10671085
}
10681086
_ => Err(VMRuntimeError::ValueError(ValueError::InvalidOperation {

src/vm/native_coroutine.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,12 @@ fn native_coroutine_spawn(vm: &mut VM, args: Vec<Value>) -> Result<Value, VMRunt
445445
.ready_queue
446446
.borrow_mut()
447447
.push_back((fiber_rc.clone(), Value::Null));
448+
448449
*vm.async_state.pending_tasks.borrow_mut() += 1;
449450

451+
// 通知事件循环有新任务
452+
vm.async_state.notify.notify_waiters();
453+
450454
// 立即返回协程对象(不阻塞)
451455
Ok(args[0].clone())
452456
}

0 commit comments

Comments
 (0)