Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion include/eventide/async/runtime/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@

namespace eventide {

class async_node;
class sync_primitive;

namespace detail {

/// Resume a coroutine and immediately drain any deferred root-frame destruction.
void resume_and_drain(std::coroutine_handle<> handle);
/// @param restore_to The async_node* to write back into the thread-local
/// current-node slot after the resumed chain returns.
/// Callers must capture this **before** any call to
/// handle_subtask_result / deliver_deferred, which
/// overwrite the slot as part of symmetric-transfer setup.
void resume_and_drain(async_node* restore_to, std::coroutine_handle<> handle);

} // namespace detail

Expand Down Expand Up @@ -122,6 +128,14 @@ class async_node {
/// Dump the async graph reachable from this node as a DOT (graphviz) graph.
std::string dump_dot() const;

/// Returns the async_node whose coroutine body is currently executing on
/// this thread, or nullptr if no coroutine is active.
const static async_node* current() noexcept;

/// Convenience: calls dump_dot() on the current node. Returns an empty
/// string if no coroutine is active.
static std::string dump_current_dot();

private:
const static async_node* get_awaiter(const async_node* node);
const static sync_primitive* get_resource_parent(const async_node* node);
Expand Down Expand Up @@ -420,4 +434,15 @@ class system_op : public async_node {
void complete() noexcept;
};

namespace detail {

/// Thread-local pointer to the async_node whose coroutine body is currently
/// executing on this thread. Set by the tracking awaiters in task.h and
/// save/restored around every resume entry-point so that user code can always
/// call async_node::current() to obtain it.
void set_current_node(async_node* node) noexcept;
async_node* current_node() noexcept;

} // namespace detail

} // namespace eventide
23 changes: 21 additions & 2 deletions include/eventide/async/runtime/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ struct promise_result<void, E, C> {
}
};

// ============================================================================
// initial_tracking_suspend — sets current node when coroutine body first runs
// ============================================================================

struct initial_tracking_suspend {
async_node* node;

bool await_ready() const noexcept {
return false;
}

void await_suspend(std::coroutine_handle<>) const noexcept {}

void await_resume() const noexcept {
detail::set_current_node(node);
}
};
Comment on lines +68 to +84
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Tracking is lost after co_awaiting pass-through awaitables.

initial_tracking_suspend only seeds TLS on the first resume. Since task_promise_object::await_transform() still forwards arbitrary awaitables unchanged at Lines 367-371, any awaitable that later resumes this coroutine via a raw handle bypasses the new bookkeeping and async_node::current() becomes null/stale in the continued body. Either narrow the contract to Eventide-managed awaitables or route external awaits through a tracked adapter.


// ============================================================================
// promise_exception, transition_await, cancel()
// ============================================================================
Expand Down Expand Up @@ -245,6 +263,7 @@ std::coroutine_handle<> propagate_fail(async_node* child_node, async_node* paren

// Exception: let parent resume normally; await_resume will rethrow.
if(child->propagated_exception) {
detail::set_current_node(parent_node);
return parent_task->handle();
}

Expand Down Expand Up @@ -301,8 +320,8 @@ struct task_promise_object : standard_task, promise_result<T, E, void>, promise_
return coroutine_handle::from_promise(*this);
}

auto initial_suspend() const noexcept {
return std::suspend_always();
auto initial_suspend() noexcept {
return initial_tracking_suspend{static_cast<async_node*>(this)};
}

auto final_suspend() const noexcept {
Expand Down
46 changes: 40 additions & 6 deletions src/async/runtime/frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,27 @@

namespace eventide {

static thread_local async_node* current_running_node = nullptr;

void detail::set_current_node(async_node* node) noexcept {
current_running_node = node;
}

async_node* detail::current_node() noexcept {
return current_running_node;
}

const async_node* async_node::current() noexcept {
return current_running_node;
}

std::string async_node::dump_current_dot() {
if(auto* node = current_running_node) {
return node->dump_dot();
}
return {};
}

namespace {

#if ETD_WORKAROUND_MSVC_COROUTINE_ASAN_UAF
Expand Down Expand Up @@ -42,9 +63,10 @@ void drain_pending_destroys() {

} // namespace

void detail::resume_and_drain(std::coroutine_handle<> handle) {
void detail::resume_and_drain(async_node* restore_to, std::coroutine_handle<> handle) {
if(handle) {
handle.resume();
current_running_node = restore_to;
}
#if ETD_WORKAROUND_MSVC_COROUTINE_ASAN_UAF
drain_pending_destroys();
Expand All @@ -66,17 +88,22 @@ std::coroutine_handle<> aggregate_op::deliver_deferred() noexcept {
awaiter->clear_awaitee();

switch(deferred) {
case Deferred::Resume: return static_cast<standard_task*>(awaiter)->handle();
case Deferred::Resume:
current_running_node = awaiter;
return static_cast<standard_task*>(awaiter)->handle();

case Deferred::Cancel:
if(policy & InterceptCancel) {
state = Cancelled;
current_running_node = awaiter;
return static_cast<standard_task*>(awaiter)->handle();
}
awaiter->state = Cancelled;
return awaiter->final_transition();

case Deferred::Error: return static_cast<standard_task*>(awaiter)->handle();
case Deferred::Error:
current_running_node = awaiter;
return static_cast<standard_task*>(awaiter)->handle();

case Deferred::None: break;
}
Expand Down Expand Up @@ -109,8 +136,9 @@ void async_node::cancel() {
return;
}

auto* prev = current_running_node;
auto next = awaiter->handle_subtask_result(link);
detail::resume_and_drain(next);
detail::resume_and_drain(prev, next);
};

switch(kind) {
Expand Down Expand Up @@ -149,8 +177,9 @@ void async_node::cancel() {
break;
}

auto* prev = current_running_node;
auto next = self->deliver_deferred();
detail::resume_and_drain(next);
detail::resume_and_drain(prev, next);
break;
}

Expand All @@ -168,7 +197,9 @@ void async_node::cancel() {
void async_node::resume() {
if(is_standard_task()) {
if(!is_cancelled() && !is_failed()) {
auto* prev = current_running_node;
static_cast<standard_task*>(this)->handle().resume();
current_running_node = prev;
#if ETD_WORKAROUND_MSVC_COROUTINE_ASAN_UAF
drain_pending_destroys();
#endif
Expand All @@ -187,8 +218,9 @@ void system_op::complete() noexcept {
if(!parent) {
return;
}
auto* prev = current_running_node;
auto next = parent->handle_subtask_result(this);
detail::resume_and_drain(next);
detail::resume_and_drain(prev, next);
}

/// Wires this node as a child of `awaiter`. For Task nodes, sets state
Expand Down Expand Up @@ -277,6 +309,7 @@ std::coroutine_handle<> async_node::handle_subtask_result(async_node* child) {
if(child->state == Cancelled) {
if(child->policy & InterceptCancel) {
self->awaitee = nullptr;
current_running_node = self;
return self->handle();
}

Expand All @@ -297,6 +330,7 @@ std::coroutine_handle<> async_node::handle_subtask_result(async_node* child) {
return propagate(child, self);
}
}
current_running_node = self;
return self->handle();
}

Expand Down
3 changes: 2 additions & 1 deletion src/async/runtime/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ bool sync_primitive::cancel_waiter(waiter_link* link) noexcept {
// than first stashing raw waiter pointers into a temporary container.
link->state = async_node::Cancelled;
link->policy = static_cast<async_node::Policy>(link->policy | async_node::InterceptCancel);
auto* prev = detail::current_node();
auto next = awaiting->handle_subtask_result(link);
detail::resume_and_drain(next);
detail::resume_and_drain(prev, next);
return true;
}

Expand Down
Loading