Skip to content

feat: pure-stage add Clock/Wait/Call effects#221

Merged
abailly merged 1 commit into
mainfrom
rk/pure-stage-step-2
May 19, 2025
Merged

feat: pure-stage add Clock/Wait/Call effects#221
abailly merged 1 commit into
mainfrom
rk/pure-stage-step-2

Conversation

@rkuhn

@rkuhn rkuhn commented May 18, 2025

Copy link
Copy Markdown
Contributor

Also refactor the implementation to make it safer, more regular, and fully tested. Tokio now also uses the effects, but with its own interpreter for managing time, mailboxes, and task concurrency.

Signed-off-by: Roland Kuhn rk@rkuhn.info

Summary by CodeRabbit

  • New Features

    • Introduced a comprehensive effect system for asynchronous stage-based simulations, including structured message passing, timing, call/response, and interruption primitives.
    • Added a new time abstraction with an Instant type and utilities for time manipulation and formatting.
    • Enabled effectful asynchronous operations within stage handlers via an Effects interface.
    • Added support for call/response patterns with unique call identifiers and response channels.
  • Refactor

    • Refactored simulation and Tokio runtime to use explicit effect handling, improving control over asynchronous stage execution and timing.
    • Simplified and streamlined stage references, removing direct message sending from StageRef.
  • Bug Fixes

    • Improved error handling and correctness checks in simulation runtime, including robust state transitions and effect validation.
  • Tests

    • Enhanced and expanded the test suite to cover timing, call/response, and new effect features.
  • Chores

    • Updated dependencies and workspace configuration.

Also refactor the implementation to make it safer, more regular, and fully tested. Tokio now also uses the effects, but with its own interpreter for managing time, mailboxes, and task concurrency.

Signed-off-by: Roland Kuhn <rk@rkuhn.info>
@coderabbitai

coderabbitai Bot commented May 18, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Righto, here's the skinny: this update gives the pure-stage crate a massive refactor, introducing a fully-fledged effect system with enums like StageEffect and Effect. The simulation and stagegraph modules now handle asynchronous effects, time, and messaging in a more structured way. Old effect logic is tossed, new timing and call/response features are in, and the tests are all spruced up to match.

Changes

Files/Paths Change Summary
Cargo.toml, crates/pure-stage/Cargo.toml Added the either crate as a dependency; reordered and updated some dependencies; enabled the "time" feature for tokio.
crates/pure-stage/src/effect.rs, crates/pure-stage/src/time.rs Added new modules for effect handling and time abstraction, with enums and structs for representing effects, responses, and instants, plus related methods for effect splitting, assertion, and time calculations.
crates/pure-stage/src/lib.rs Publicly re-exported new effect and time modules; expanded exports from stagegraph and types; added a Clippy lint allowance.
crates/pure-stage/src/simulation.rs, crates/pure-stage/src/simulation/state.rs Refactored simulation builder and state: added effect boxes, atomic clocks, explicit effect handling, and new fields to StageData for waiting effects and sender queues; updated stage construction and output wiring to use effectful handlers.
crates/pure-stage/src/simulation/running.rs Major overhaul of simulation runtime: added time management, effect tracking, sleeping/wakeup logic, new resume methods for each effect type, and improved error handling.
crates/pure-stage/src/stage.rs Removed internal send logic from StageRef and StageBuildRef, replaced with phantom types and a new Void enum; simplified handles to be nominal only; updated methods accordingly.
crates/pure-stage/src/stagegraph.rs Introduced the Effects struct for effectful operations; added CallId and CallRef for call/response patterns; changed the StageGraph trait to pass effects into stage handlers.
crates/pure-stage/src/tokio.rs Refactored Tokio backend to support effect-driven stage execution; added effect interpreter, new error types, and mailbox management; updated task spawning and handler signatures for effectful processing.
crates/pure-stage/src/types.rs Added cast_msg_ref for safe message downcasting; made Name comparable with ordering traits.
crates/pure-stage/src/simulation/effect.rs, crates/pure-stage/src/simulation/interrupt.rs Removed the old effect and interrupter logic, making way for the new, unified effect system.
crates/pure-stage/tests/simulation.rs Updated all tests to use the new effect interface, improved error handling, added tests for time and call/response features, and made API usage more ergonomic.

Sequence Diagram(s)

sequenceDiagram
    participant Stage as Stage Handler
    participant Effects as Effects Interface
    participant Sim as Simulation Runtime
    participant Other as Other Stage

    Stage->>Effects: send(target, msg)
    Effects->>Sim: airlock_effect(Send, ...)
    Sim-->>Effects: Effect::Send emitted
    Sim->>Other: Deliver message to mailbox
    Sim-->>Stage: StageEffect::Receive or response

    Stage->>Effects: call(target, msg, timeout)
    Effects->>Sim: airlock_effect(Call, ...)
    Sim-->>Effects: Effect::Call emitted
    Sim->>Other: Deliver call message
    Other->>Effects: respond(call_ref, response)
    Effects->>Sim: airlock_effect(Respond, ...)
    Sim-->>Effects: Effect::Respond emitted
    Sim-->>Stage: StageEffect::Receive or response

    Stage->>Effects: wait(duration)
    Effects->>Sim: airlock_effect(Wait, ...)
    Sim-->>Effects: Effect::Wait emitted
    Sim-->>Stage: StageEffect::WaitResponse after duration

    Stage->>Effects: clock()
    Effects->>Sim: airlock_effect(Clock, ...)
    Sim-->>Effects: Effect::Clock emitted
    Sim-->>Stage: StageEffect::ClockResponse(now)
Loading

Possibly related PRs

  • pragma-org/amaru#206: This earlier PR introduced the foundational simulation and effect system, which the current PR overhauls with a more advanced and extensible effect architecture.

Suggested reviewers

  • stevana

Poem

G'day to the new effect parade,
Where stages dance and messages wade.
Time ticks on in simulated style,
Async calls and waits compile.
Old logic's gone, new magic's spun—
Like Mario with a power-up, mate,
Pure-stage just leveled up, job done!

⏰🦘✨

Note

⚡️ AI Code Reviews for VS Code, Cursor, Windsurf

CodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback.
Learn more here.


Note

⚡️ Faster reviews with caching

CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.
Enjoy the performance boost—your workflow just got faster.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (6)
crates/pure-stage/src/types.rs (1)

48-63: Trim the fat: de-duplicate the cast logic

cast_msg_ref repeats the same is()/downcast_ref() two-step that we already perform in the neighbouring helpers.
You can make the code leaner (and skip the expect) by trying the down-cast first and bailing only if it fails—same outcome, fewer instructions:

-pub fn cast_msg_ref<T: Message>(this: &dyn Message) -> anyhow::Result<&T> {
-    if (this as &dyn Any).is::<T>() {
-        #[allow(clippy::expect_used)]
-        Ok((this as &dyn Any)
-            .downcast_ref::<T>()
-            .expect("checked above"))
-    } else {
-        anyhow::bail!(
-            "message type error: expected {}, got {:?} ({})",
-            std::any::type_name::<T>(),
-            this,
-            this.type_name()
-        )
-    }
+pub fn cast_msg_ref<T: Message>(this: &dyn Message) -> anyhow::Result<&T> {
+    (this as &dyn Any)
+        .downcast_ref::<T>()
+        .ok_or_else(|| anyhow::anyhow!(
+            "message type error: expected {}, got {:?} ({})",
+            std::any::type_name::<T>(),
+            this,
+            this.type_name()
+        ))
 }

Keeps the intent crystal clear and avoids the second trait-object cast.
Feel free to ignore if you prefer symmetry with cast_msg.

crates/pure-stage/src/stagegraph.rs (1)

140-145: Potential u64 wrap-around in CallId::new()

AtomicU64::fetch_add will happily overflow back to zero after 2⁶⁴ − 1 calls. It’ll take yonks, but wrap-around IDs can break equality & hashing assumptions. Worth at least a comment or a saturating add.

crates/pure-stage/src/stage.rs (1)

70-75: Hard-coded "noop" name could collide with a legit stage 😬

If someone cleverly names their real stage "noop", the placeholder handle and the real deal will share an identity, leading to very confusing logs. Maybe prefix with "__noop_" plus a counter/UUID to keep things unique?

crates/pure-stage/src/tokio.rs (1)

68-69: Risk of zero-capacity mailboxes

mpsc::channel(self.inner.mailbox_size / 2) will create a zero-capacity channel when mailbox_size is 1. That accidentally turns every send into a synchronous back-pressure point—probably not what you were after.

-let (tx, rx) = mpsc::channel(self.inner.mailbox_size / 2);
+let cap = (self.inner.mailbox_size / 2).max(1);
+let (tx, rx) = mpsc::channel(cap);
crates/pure-stage/src/effect.rs (1)

45-58: Minor clone-happy section – cheap micro-optimisation available

Cloning name only to immediately hand the owned version to Effect::Send
is a tad wasteful (albeit cheap). You can hand over the original and leave the
clone for the retained StageEffect:

-(
-    StageEffect::Send(name.clone(), (), call_param),
+(
+    StageEffect::Send(name.clone(), (), call_param),
@@
-    Effect::Send {
-        from: at_name,
-        to: name,
+    Effect::Send {
+        from: at_name,
+        to: name.clone(),

Not a blocker, but keeps allocations lean.

crates/pure-stage/src/simulation.rs (1)

98-107: Docs mention Interrupter::interrupt, code now uses eff.interrupt

The example in the doc-comment still references the pre-refactor API.
Update the snippet so newcomers don’t feel like they walked onto the wrong movie set.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b277e58 and 6a0861c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • Cargo.toml (1 hunks)
  • crates/pure-stage/Cargo.toml (1 hunks)
  • crates/pure-stage/src/effect.rs (1 hunks)
  • crates/pure-stage/src/lib.rs (1 hunks)
  • crates/pure-stage/src/simulation.rs (11 hunks)
  • crates/pure-stage/src/simulation/effect.rs (0 hunks)
  • crates/pure-stage/src/simulation/interrupt.rs (0 hunks)
  • crates/pure-stage/src/simulation/running.rs (5 hunks)
  • crates/pure-stage/src/simulation/state.rs (2 hunks)
  • crates/pure-stage/src/stage.rs (1 hunks)
  • crates/pure-stage/src/stagegraph.rs (3 hunks)
  • crates/pure-stage/src/time.rs (1 hunks)
  • crates/pure-stage/src/tokio.rs (4 hunks)
  • crates/pure-stage/src/types.rs (2 hunks)
  • crates/pure-stage/tests/simulation.rs (6 hunks)
💤 Files with no reviewable changes (2)
  • crates/pure-stage/src/simulation/interrupt.rs
  • crates/pure-stage/src/simulation/effect.rs
🧰 Additional context used
🧬 Code Graph Analysis (3)
crates/pure-stage/src/lib.rs (5)
crates/pure-stage/src/simulation/running.rs (2)
  • effect (233-235)
  • cast_msg_ref (987-987)
crates/pure-stage/src/simulation.rs (2)
  • stage (220-261)
  • cast_msg (239-239)
crates/pure-stage/src/stagegraph.rs (2)
  • stage (259-267)
  • cast_msg (113-113)
crates/pure-stage/src/tokio.rs (1)
  • stage (54-79)
crates/pure-stage/src/types.rs (3)
  • cast_msg (29-46)
  • cast_msg_ref (49-63)
  • cast_state (86-103)
crates/pure-stage/src/types.rs (2)
crates/pure-stage/src/simulation/running.rs (1)
  • cast_msg_ref (987-987)
crates/amaru-ledger/src/rules/block.rs (2)
  • anyhow (67-72)
  • bail (63-65)
crates/pure-stage/src/effect.rs (4)
crates/pure-stage/src/simulation.rs (1)
  • cast_msg (239-239)
crates/pure-stage/src/stagegraph.rs (5)
  • cast_msg (113-113)
  • fmt (36-41)
  • fmt (170-176)
  • call (87-119)
  • eq (164-166)
crates/pure-stage/src/types.rs (7)
  • cast_msg (29-46)
  • std (24-24)
  • std (77-77)
  • fmt (127-129)
  • from (121-123)
  • eq (13-13)
  • eq (17-22)
crates/pure-stage/tests/simulation.rs (5)
  • StageRef (18-18)
  • StageRef (50-50)
  • StageRef (106-106)
  • StageRef (212-212)
  • call (195-257)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Snapshots (preprod, 10.1.4)
  • GitHub Check: Build on windows-latest with target x86_64-pc-windows-msvc
  • GitHub Check: Build on ubuntu-24.04 with target aarch64-unknown-linux-musl
  • GitHub Check: Build on ubuntu-latest with target riscv32im-risc0-zkvm-elf
  • GitHub Check: Build on macos-latest with target aarch64-apple-darwin
  • GitHub Check: Build on ubuntu-latest with target x86_64-unknown-linux-gnu
  • GitHub Check: Coverage
🔇 Additional comments (11)
Cargo.toml (1)

23-23: Lookin' good! Dependency added for the new effect system.

The addition of the either crate makes perfect sense for the new effect system. It's like picking the right tool for the job - reminds me of how Batman always has the right gadget in his utility belt.

crates/pure-stage/Cargo.toml (1)

15-18: Spot on with these dependency updates, mate!

Adding the either crate and including the "time" feature for tokio is exactly what you need for the new Clock, Wait, and Call effects. The tokio time feature will be crucial for those timing abstractions - it's like giving your DeLorean the flux capacitor it needs to time travel!

crates/pure-stage/src/simulation/state.rs (2)

1-2: Good on ya for updating the imports!

The imports are properly updated to bring in the new StageEffect and Name types that are needed for the extended stage state functionality. Clean as a whistle!


49-55: Brilliant extension of the StageData struct!

The new fields in StageData provide the essential backbone for the effect system:

  • name gives each stage a proper identity (like naming your character in Skyrim)
  • waiting lets the stage track its current pending effect (smart!)
  • senders queue gives you proper message handling with backpressure

These changes align perfectly with the PR objective of introducing Clock, Wait, and Call effects. The structure now has everything it needs to participate in the effect system.

crates/pure-stage/src/lib.rs (3)

1-2: Good call disabling those Clippy lints!

Disabling the panic and expect_used lints at the crate level makes sense for an effect system. Like in The Matrix when Neo has to break some rules to save Morpheus - sometimes you need those assertions to keep the system safe and sound.


3-9: Sweet module organization, mate!

Adding the dedicated effect and time modules is a top-notch way to organize the new functionality. It's like Marie Kondo came in and helped tidy up the codebase - everything has its proper place now!


11-15: Deadly exports setup!

The public exports are spot on, exposing just what's needed for users of the crate:

  • Effect, CallId, CallRef, and Effects for the effect system
  • Instant for the time abstractions
  • cast_msg_ref for safer message handling

This gives users all the tools they need without exposing the internal plumbing. It's like a good API in a video game - gives you all the controls you need without overwhelming you with the details.

crates/pure-stage/src/types.rs (1)

105-105: Name now orders itself—sweet as!

Deriving PartialOrd and Ord is spot-on for heap & map usage. No concerns here, mate.

crates/pure-stage/tests/simulation.rs (1)

236-245: Verify CallRef juggling in manual flow

Here we stash let cr2 = cr.dummy(); and later assert/respond with &cr2, even though the original response inside the stage used msg.1.
If dummy() fabricates a fresh handle, we may be mismatching call-IDs, which could hide a logic error the tests should catch.

Double-check that:

  1. cr.dummy() indeed aliases the same call-id, not a brand-new one.
  2. The tuple passed via resume_send matches the closure (msg + 1, cr) used in the call effect, otherwise the callee receives the wrong payload.

Shout if you’d like a hand whipping up an extra assertion around the call-id equality.

crates/pure-stage/src/stagegraph.rs (1)

111-116: Type-cast failure will silently panic—surface it to the caller instead 🎭

cast_msg::<Resp>(resp).expect("internal messaging type error") will trip a panic if the remote stage responds with the wrong concrete type. That’s a foot-gun worthy of a Dark Souls boss fight. Consider bubbling an error up the future instead, so callers can decide how to react.

Would you like a helper that converts the mismatch into anyhow::Error (or a custom error type) so the stage can ? it rather than crash?

crates/pure-stage/src/simulation/running.rs (1)

734-756: resume_call_internal swallows recv errors but never cancels timeout

When the oneshot receiver errors out you treat it as a timeout, yet the
scheduled wake-up remains queued (and will re-fire later). This can trigger the
wakeup twice, potentially re-running the CallTimeout branch and breaking
invariants.

Consider:

 let msg = recv
     .try_recv()
     .ok()
     .map(StageResponse::CallResponse)
     .unwrap_or(StageResponse::CallTimeout);

-// missing: cancel any pending timeout wake-ups for this `id`
+// SAFETY: prevent double-fire by removing the matching wake-up.
+sim.sleeping.retain(|s| !matches!(&s.wakeup, w if w.matches_call(id)));

(You’ll obviously need some bookkeeping helper to identify the wake-up.)

Comment on lines +26 to +29
pub fn saturating_since(&self, other: Self) -> Duration {
self.0.duration_since(other.0)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Mind the panic: saturating_since isn’t saturating

tokio::time::Instant::duration_since panics when self < other, which means saturating_since can blow up instead of “saturating”.
Swap to a checked variant and clamp manually:

-    pub fn saturating_since(&self, other: Self) -> Duration {
-        self.0.duration_since(other.0)
+    pub fn saturating_since(&self, other: Self) -> Duration {
+        match self.0.checked_duration_since(other.0) {
+            Some(d) => d,
+            None => Duration::ZERO,
+        }
     }

A quick unit test comparing an earlier.saturating_since(later) to Duration::ZERO will keep future bugs at bay.

🤖 Prompt for AI Agents
In crates/pure-stage/src/time.rs around lines 26 to 29, the saturating_since
function currently calls duration_since which panics if self is earlier than
other. To fix this, replace duration_since with a checked method that returns an
Option, then return Duration::ZERO if self is earlier than other to ensure
saturation. Also, add a unit test that verifies saturating_since returns
Duration::ZERO when called with an earlier instant compared to a later one to
prevent regressions.

Comment on lines +94 to +99
let now = (self.now)();
let deadline = now.checked_add(timeout).expect("timeout too long");
let target = target.name();
let me = self.me.name();
let id = CallId::new();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid panicking on extra-long timeouts 🕰️

Using checked_add and then expect("timeout too long") will bring the whole shebang down if someone accidentally passes Duration::MAX (or anything that overflows an Instant). A runaway user input shouldn’t be able to nuke the stage graph—let’s fail gracefully instead of doing the ol’ “ya blew it up!” routine.

-let deadline = now.checked_add(timeout).expect("timeout too long");
+let deadline = match now.checked_add(timeout) {
+    Some(d) => d,
+    None => {
+        tracing::warn!(
+            "call timeout {:?} is too large – capping at Instant::MAX instead",
+            timeout
+        );
+        Instant::MAX
+    }
+};

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In crates/pure-stage/src/stagegraph.rs around lines 94 to 99, the use of
checked_add followed by expect causes a panic if the timeout duration is too
long and overflows. To fix this, replace the expect call with graceful error
handling by checking if checked_add returns None, and then handle that case
without panicking, such as returning a Result or an Option to indicate failure
instead of crashing.

Comment on lines +28 to +30
// A StageRef itself is just a name allowing the creation of thread-local effects.
unsafe impl<Msg: Send, State: Send> Sync for StageRef<Msg, State> {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

unsafe impl Sync looks unnecessary—let the compiler derive it ☘️

Manually declaring unsafe impl Sync is like bringing a bazooka to a barbie: powerful but dangerous.
StageRef only holds a Name (which is Send + Sync) and a PhantomData, so the auto-trait should kick in just fine. Dropping the unsafe keeps the codebase safer and easier on future maintainers.

-// A StageRef itself is just a name allowing the creation of thread-local effects.
-unsafe impl<Msg: Send, State: Send> Sync for StageRef<Msg, State> {}
+// Auto-traits already make this `Sync` when `Msg` and `State` are `Send`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// A StageRef itself is just a name allowing the creation of thread-local effects.
unsafe impl<Msg: Send, State: Send> Sync for StageRef<Msg, State> {}
// Auto-traits already make this `Sync` when `Msg` and `State` are `Send`.
🤖 Prompt for AI Agents
In crates/pure-stage/src/stage.rs around lines 28 to 30, remove the manual
unsafe impl Sync for StageRef since the compiler can automatically derive Sync
safely here. This avoids unnecessary unsafe code and relies on the compiler's
auto-trait implementation because StageRef only contains types that are already
Send + Sync.

Comment on lines +142 to +149
loop {
let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
if let Poll::Ready(state) = poll {
return state;
}
let Some(Left(eff)) = effect.lock().take() else {
panic!("stage `{name}` used .await on something that was not a stage effect");
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Busy-loop potential in the interpreter’s polling cycle 🔄

Polling the stage future with Waker::noop() inside a tight loop means we rely on our sleeps (Wait, sleep, etc.) to yield. If a future immediately returns Poll::Pending again (no effect, no timer), we’ll spin like Sonic on a chilli-dog bender and hog a CPU core.

Consider yielding when nothing was produced:

-        let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
+        let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
         if let Poll::Ready(state) = poll {
             return state;
         }
+        if effect.lock().is_none() {
+            // No effect produced – give Tokio a chance to schedule others.
+            tokio::task::yield_now().await;
+            continue;
+        }

That one-liner prevents busy-waiting without messing with the overall interpreter logic.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
loop {
let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
if let Poll::Ready(state) = poll {
return state;
}
let Some(Left(eff)) = effect.lock().take() else {
panic!("stage `{name}` used .await on something that was not a stage effect");
};
loop {
let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
if let Poll::Ready(state) = poll {
return state;
}
// If the stage didn’t emit an effect, yield to Tokio before polling again
if effect.lock().is_none() {
// No effect produced – give Tokio a chance to schedule others.
tokio::task::yield_now().await;
continue;
}
let Some(Left(eff)) = effect.lock().take() else {
panic!("stage `{name}` used .await on something that was not a stage effect");
};
// …process `eff`…
}
🤖 Prompt for AI Agents
In crates/pure-stage/src/tokio.rs around lines 142 to 149, the current loop
polls the stage future with Waker::noop() repeatedly without yielding, causing a
busy-wait that hogs the CPU if the future returns Poll::Pending immediately. To
fix this, insert a yield point such as calling tokio::task::yield_now().await or
an equivalent async yield inside the loop when no effect is produced before
continuing the next poll. This prevents busy-waiting while preserving the
interpreter's logic.

Comment on lines +198 to +213
pub fn assert_respond<Msg, St, Msg2: Message>(
&self,
at_stage: &StageRef<Msg, St>,
cr: &CallRef<Msg2>,
msg: Msg2,
) {
match self {
Effect::Respond {
at_stage: a,
target: _,
id: i,
msg: m,
} if a == &at_stage.name && *i == cr.id && msg.eq(&**m) => {}
_ => panic!("unexpected effect {self:?}\n looking for Respond at {at_stage:?} with id {cr:?} and msg {msg:?}"),
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add PartialEq bound to silence a hard compile-time error

assert_respond invokes msg.eq(&**m) but eq lives on PartialEq.
Without the bound the compiler will bail out faster than Mario in an underwater level.

-pub fn assert_respond<Msg, St, Msg2: Message>(
+pub fn assert_respond<Msg, St, Msg2: Message + PartialEq>(

Once this lands the whole test-support API compiles again.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn assert_respond<Msg, St, Msg2: Message>(
&self,
at_stage: &StageRef<Msg, St>,
cr: &CallRef<Msg2>,
msg: Msg2,
) {
match self {
Effect::Respond {
at_stage: a,
target: _,
id: i,
msg: m,
} if a == &at_stage.name && *i == cr.id && msg.eq(&**m) => {}
_ => panic!("unexpected effect {self:?}\n looking for Respond at {at_stage:?} with id {cr:?} and msg {msg:?}"),
}
}
pub fn assert_respond<Msg, St, Msg2: Message + PartialEq>(
&self,
at_stage: &StageRef<Msg, St>,
cr: &CallRef<Msg2>,
msg: Msg2,
) {
match self {
Effect::Respond {
at_stage: a,
target: _,
id: i,
msg: m,
} if a == &at_stage.name && *i == cr.id && msg.eq(&**m) => {}
_ => panic!("unexpected effect {self:?}\n looking for Respond at {at_stage:?} with id {cr:?} and msg {msg:?}"),
}
}
🤖 Prompt for AI Agents
In crates/pure-stage/src/effect.rs around lines 198 to 213, the method
assert_respond calls msg.eq(&**m), which requires the Msg2 type to implement
PartialEq. To fix the compile error, add a PartialEq trait bound to the Msg2
generic parameter in the function signature, ensuring Msg2: Message + PartialEq.

Comment on lines +216 to +264
impl PartialEq for Effect {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(
Effect::Receive { at_stage },
Effect::Receive {
at_stage: other_at_stage,
},
) => at_stage == other_at_stage,
(
Effect::Send {
from,
to,
msg,
call,
},
Effect::Send {
from: other_from,
to: other_to,
msg: other_msg,
call: other_call,
},
) => {
from == other_from && to == other_to && msg.eq(&**other_msg) && *call == *other_call
}
(
Effect::Clock { at_stage },
Effect::Clock {
at_stage: other_at_stage,
},
) => at_stage == other_at_stage,
(
Effect::Wait { at_stage, duration },
Effect::Wait {
at_stage: other_at_stage,
duration: other_duration,
},
) => at_stage == other_at_stage && duration == other_duration,
(
Effect::Failure { at_stage, error },
Effect::Failure {
at_stage: other_at_stage,
error: other_error,
},
) => at_stage == other_at_stage && error.to_string() == other_error.to_string(),
_ => false,
}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

PartialEq implementation ignores variants & mis-uses trait objects

  1. Respond and Interrupt are never compared, so effect_a == effect_b will be false even when the two effects are identical.
  2. msg.eq(&**other_msg) requires the underlying dyn Message to implement PartialEq, but that is not enforced at the trait level—this will not even compile on stable Rust today.

Patch suggestion:

@@
                 Effect::Send {
                     from == other_from && to == other_to &&
-                    msg.eq(&**other_msg) && *call == *other_call
+                    // down-cast so we only rely on `PartialEq` for the concrete type
+                    cast_msg::<Box<dyn Any>>(msg.clone())
+                        .and_then(|boxed| {
+                            boxed.downcast::<Box<dyn Message + PartialEq>>()
+                                .ok()
+                                .map(|lhs| {
+                                    let rhs = cast_msg::<Box<dyn Any>>(other_msg.clone())
+                                        .unwrap()
+                                        .downcast::<Box<dyn Message + PartialEq>>()
+                                        .unwrap();
+                                    lhs == rhs
+                                })
+                        })
+                        .unwrap_or(false)
+                        && *call == *other_call
                 }
@@
+            (
+                Effect::Respond { at_stage, target, id, msg },
+                Effect::Respond { at_stage: other_stage, target: other_target, id: other_id, msg: other_msg },
+            ) => at_stage == other_stage
+                && target == other_target
+                && id == other_id
+                && msg.eq(&**other_msg),
+            (
+                Effect::Interrupt { at_stage },
+                Effect::Interrupt { at_stage: other_stage },
+            ) => at_stage == other_stage,

(You’ll want to extract the down-cast helper to keep things tidy.)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In crates/pure-stage/src/effect.rs between lines 216 and 264, the PartialEq
implementation for Effect is incomplete and incorrect: it does not compare
Respond and Interrupt variants, and it attempts to call eq on trait objects
without ensuring PartialEq is implemented for them. To fix this, add matching
arms for Respond and Interrupt variants to compare them properly, and replace
the msg.eq call with a safe downcasting helper function that checks for equality
only if the underlying types support it. Extract this downcast helper function
to keep the code clean and maintainable.

Comment on lines +1 to +3
use super::{EffectBox, Instant, StageData, StageEffect, StageResponse, StageState};
use crate::{cast_state, stagegraph::CallRef, CallId, Effect, Message, Name, StageRef, State};
use either::Either::{Left, Right};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Wrong module path & missing generic parameter blow up the import

StageEffect and StageResponse live in crate::effect, not in super.
Pulling them in via super::{ … } makes the compiler hurl an unresolved import error.

-use super::{EffectBox, Instant, StageData, StageEffect, StageResponse, StageState};
+use super::{EffectBox, Instant, StageData, StageState};
+use crate::effect::{StageEffect, StageResponse};

This also keeps the generic parameter of StageEffect inferable in the rest
of the file, so no other code changes are necessary.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use super::{EffectBox, Instant, StageData, StageEffect, StageResponse, StageState};
use crate::{cast_state, stagegraph::CallRef, CallId, Effect, Message, Name, StageRef, State};
use either::Either::{Left, Right};
use super::{EffectBox, Instant, StageData, StageState};
use crate::effect::{StageEffect, StageResponse};
use crate::{cast_state, stagegraph::CallRef, CallId, Effect, Message, Name, StageRef, State};
use either::Either::{Left, Right};
🤖 Prompt for AI Agents
In crates/pure-stage/src/simulation/running.rs lines 1 to 3, the imports for
StageEffect and StageResponse are incorrectly referenced from super, causing
unresolved import errors. Change the import path to bring StageEffect and
StageResponse from crate::effect instead of super, and ensure that the generic
parameter for StageEffect is included in the import statement to keep it
inferable throughout the file.

Comment on lines +106 to +112
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.time.cmp(&other.time).reverse().then_with(|| {
let left = self.wakeup.as_ref() as *const _ as *const u8;
let right = other.wakeup.as_ref() as *const _ as *const u8;
left.addr().cmp(&right.addr())
})
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

ptr.addr() is still nightly-only – use raw pointers instead

*const T::addr() is an unstable API behind the ptr_addr_of feature gate.
Switch to casting to usize, it’s 100 % safe for ordering purposes here.

-            let left = self.wakeup.as_ref() as *const _ as *const u8;
-            let right = other.wakeup.as_ref() as *const _ as *const u8;
-            left.addr().cmp(&right.addr())
+            let left = self.wakeup.as_ref() as *const _ as usize;
+            let right = other.wakeup.as_ref() as *const _ as usize;
+            left.cmp(&right)

That’ll get you compiling on stable without any feature flags.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.time.cmp(&other.time).reverse().then_with(|| {
let left = self.wakeup.as_ref() as *const _ as *const u8;
let right = other.wakeup.as_ref() as *const _ as *const u8;
left.addr().cmp(&right.addr())
})
}
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.time.cmp(&other.time).reverse().then_with(|| {
let left = self.wakeup.as_ref() as *const _ as usize;
let right = other.wakeup.as_ref() as *const _ as usize;
left.cmp(&right)
})
}
🤖 Prompt for AI Agents
In crates/pure-stage/src/simulation/running.rs around lines 106 to 112, the code
uses the nightly-only ptr.addr() method on raw pointers, which is unstable.
Replace the use of ptr.addr() with casting the raw pointers to usize for
comparison, as this is stable and safe for ordering purposes. Update the
comparison to use left as usize and right as usize instead of left.addr() and
right.addr().

@codecov

codecov Bot commented May 18, 2025

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 73.16858% with 304 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/pure-stage/src/simulation/running.rs 84.34% 103 Missing ⚠️
crates/pure-stage/src/tokio.rs 0.00% 92 Missing ⚠️
crates/pure-stage/src/effect.rs 60.38% 61 Missing ⚠️
crates/pure-stage/src/stagegraph.rs 83.89% 19 Missing ⚠️
crates/pure-stage/src/time.rs 48.00% 13 Missing ⚠️
crates/pure-stage/src/simulation.rs 81.03% 11 Missing ⚠️
crates/pure-stage/src/types.rs 54.54% 5 Missing ⚠️
Files with missing lines Coverage Δ
crates/pure-stage/src/simulation/state.rs 0.00% <ø> (ø)
crates/pure-stage/src/stage.rs 84.37% <100.00%> (+5.20%) ⬆️
crates/pure-stage/src/types.rs 92.30% <54.54%> (+4.24%) ⬆️
crates/pure-stage/src/simulation.rs 82.55% <81.03%> (+2.44%) ⬆️
crates/pure-stage/src/time.rs 48.00% <48.00%> (ø)
crates/pure-stage/src/stagegraph.rs 83.89% <83.89%> (ø)
crates/pure-stage/src/effect.rs 60.38% <60.38%> (ø)
crates/pure-stage/src/tokio.rs 0.00% <0.00%> (ø)
crates/pure-stage/src/simulation/running.rs 84.81% <84.34%> (+4.72%) ⬆️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@abailly abailly left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite a lot of code to digest 😅 Given it's pretty much self-contained in single crate and we'll only figure out whether it really works when rewiring the pipeline stages, I suggest we just merge and this and proceed with a concrete example to both run simulation and actual tokio-based stages. Seems like @stevana's echo service would be useful here?

Comment on lines +10 to +15
Send(
Name,
T,
// this is present in case the send is the first part of a call effect
Option<(Duration, oneshot::Receiver<Box<dyn Message>>, CallId)>,
),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me having named fields is clearer as it's probably the case we'll quickly forget exactly what those tuple values are

Suggested change
Send(
Name,
T,
// this is present in case the send is the first part of a call effect
Option<(Duration, oneshot::Receiver<Box<dyn Message>>, CallId)>,
),
Send {
name: Name,
msg: T,
// this is present in case the send is the first part of a call effect
call_param: Option<(Duration, oneshot::Receiver<Box<dyn Message>>, CallId)>,
},

),
Clock,
Wait(Duration),
Call(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here?

@stevana

stevana commented May 19, 2025

Copy link
Copy Markdown
Contributor

I suggest we just merge and this and proceed with a concrete example to both run simulation and actual tokio-based stages.

Agreed!

Seems like @stevana's echo service would be useful here?

I already have a branch (stevan/pure-stage-echo) where I've started porting echo to the pure-stage API, will try to get it working with the changes in this PR asap. We could use Maelstrom itself to test the "production"/tokio deployment and the simulator to test the "simulation" deployment.

(Now that we got RPC and time another small self-contained example is the broadcast example from Maelstrom. This has retries, unlike echo, which give us something small and concrete to test that we can simulate network faults (e.g. delaying / dropping messages) properly.)

@abailly abailly merged commit 4b60faf into main May 19, 2025
13 checks passed
@abailly abailly deleted the rk/pure-stage-step-2 branch May 19, 2025 09:16
This was referenced Aug 20, 2025
@coderabbitai coderabbitai Bot mentioned this pull request Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants