Skip to content

Feat/transports#32

Merged
EggerMarc merged 7 commits intomainfrom
feat/transports
Apr 12, 2026
Merged

Feat/transports#32
EggerMarc merged 7 commits intomainfrom
feat/transports

Conversation

@EggerMarc
Copy link
Copy Markdown
Owner

@EggerMarc EggerMarc commented Apr 12, 2026

Summary by CodeRabbit

  • New Features

    • Pluggable transport layer with a provided Reqwest-based transport; builders default to it and can accept custom transports via with_transport()
    • Streaming surfaces normalized transport events and includes an SSE parsing utility
  • Breaking Changes

    • Embeddings embed() now requires &mut self
    • Clients/providers are now generic over transports (affects public type signatures)
  • Documentation

    • Roadmaps, READMEs, and examples updated for transport extensibility, WebSocket plans, WASM and gRPC targets

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 12, 2026

Warning

Rate limit exceeded

@EggerMarc has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 14 minutes and 22 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 14 minutes and 22 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 67f4a528-7499-426f-9c2e-f2e162066f3a

📥 Commits

Reviewing files that changed from the base of the PR and between 3cd335f and 9d9ae2b.

📒 Files selected for processing (5)
  • core/src/transport/traits.rs
  • examples/router/embeddings.rs
  • providers/gemini/src/api/embedding.rs
  • providers/openai/src/api/embedding.rs
  • transports/reqwest/src/lib.rs
📝 Walkthrough

Walkthrough

Adds a pluggable Transport abstraction to chat-core, centralizes SSE parsing into core transport, makes providers and builders generic over T: Transport with a default ReqwestTransport crate, changes embedding APIs to require &mut self, and maps transport errors into chat error types.

Changes

Cohort / File(s) Summary
Core Transport API
core/src/transport/mod.rs, core/src/transport/traits.rs, core/src/transport/types.rs, core/src/lib.rs
Introduce public transport module with Transport trait (send/stream) and transport-agnostic types (Request, Response, Event, EventStream, TransportError).
SSE Parser
core/src/transport/sse.rs, core/src/types/response.rs
New SseParser in core/src/transport/sse.rs (push/next_event); original in-file parser removed and re-exported from types/response.rs.
Reqwest Transport Crate
transports/reqwest/Cargo.toml, transports/reqwest/src/lib.rs
New workspace crate transport-reqwest implementing ReqwestTransport: Transport with send and stream (SSE decoding using SseParser).
Providers — transport-generic refactor
providers/openai/..., providers/gemini/..., providers/claude/... (src/client.rs, src/lib.rs, src/api/*, src/api/types/*)
Providers and builders now generic over T: Transport; replace direct reqwest usage with chat_core::transport::Request/Response flows; streaming switched to consume EventStream and event tuples.
Client/Builder updates
providers/*/src/lib.rs, providers/*/src/client.rs
Builders gain T: Transport generic (default ReqwestTransport), with_transport(...) added, build() returns Client<T> storing scheme/host/base_path and transport instead of reqwest::Client.
Embeddings API change & examples
core/src/traits.rs, examples/router/embeddings.rs
EmbeddingsProvider::embed changed to &mut self; example updated to pass &mut and use Mutex<...> for shared mutable embedder.
Error conversions
core/src/error.rs
Add impl From<TransportError> for ChatError, mapping transport errors to ChatError variants (Network/RateLimited/Provider).
Top-level re-exports
src/lib.rs
Add pub mod transport { pub use chat_core::transport::*; } and re-export Transport at crate root.
Workspace & manifests
Cargo.toml, core/Cargo.toml, providers/*/Cargo.toml, transports/reqwest/Cargo.toml
Add transports/reqwest workspace member; providers depend on transport-reqwest; adjust futures/feature wiring in core and provider manifests.
Docs & Roadmap
ROADMAP.md, README.md, core/AGENTS.md, providers/*/README.md
Document pluggable transport plan, OpenAI WS details, WASM/gRPC long-term transports, and builder with_transport usage; update examples and architecture diagram.

Sequence Diagram(s)

sequenceDiagram
    participant Provider as Provider<T>
    participant Core as chat_core::transport
    participant Reqwest as ReqwestTransport
    participant Client as reqwest::Client
    participant HTTP as HTTP Server

    Note over Provider,HTTP: Unary Request Flow
    Provider->>Provider: Serialize body -> bytes
    Provider->>Core: call send(Request{scheme,host,path,headers,body})
    Core->>Reqwest: delegate to ReqwestTransport.send
    Reqwest->>Client: POST https://{host}{path} (headers, body)
    Client->>HTTP: HTTP Request
    HTTP->>Client: HTTP Response (status, headers, body bytes)
    Client->>Reqwest: return status, headers, body bytes
    Reqwest->>Core: return Response
    Core->>Provider: Result<Response, TransportError>
    Provider->>Provider: Deserialize from res.body
Loading
sequenceDiagram
    participant Provider as Provider<T>
    participant Core as chat_core::transport
    participant Reqwest as ReqwestTransport
    participant Client as reqwest::Client
    participant HTTP as HTTP Server
    participant Sse as SseParser

    Note over Provider,HTTP: Streaming (SSE) Flow
    Provider->>Provider: Serialize body -> bytes
    Provider->>Core: call stream(Request{scheme,host,path,headers,body})
    Core->>Reqwest: delegate to ReqwestTransport.stream
    Reqwest->>Client: POST for streaming response
    Client->>HTTP: HTTP Request
    HTTP->>Client: Stream bytes (SSE)
    Client->>Reqwest: yield byte chunks
    Reqwest->>Sse: push(chunk)
    Sse->>Reqwest: next_event() -> (event_type, data)
    Reqwest->>Core: yield Event or TransportError
    Core->>Provider: EventStream
    Provider->>Provider: Iterate events, parse JSON payloads
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • Feat/claude #27 — refactors Claude provider to the transport abstraction (client/api changes); strong code-level overlap.
  • Feature: Streaming Responses for Gemini #24 — touches SSE/stream parsing and stream handling; directly related to SseParser relocation and event-stream consumption.
  • Fix/OpenAI #26 — changes provider trait receivers to &mut self; directly related to EmbeddingsProvider signature adjustment.

Poem

🐰 I hopped through crates and traity land,

moved parsers, bytes, and headers by hand.
Reqwest carries the pool so neat,
providers now plug in transports fleet.
A rabbit smiles — modular and grand.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.37% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Feat/transports' uses a vague, generic phrasing that doesn't clearly describe the main change; 'transports' alone doesn't convey what was accomplished. Use a more specific, descriptive title that highlights the primary change, such as 'Add pluggable Transport trait abstraction' or 'Introduce Transport layer for provider abstraction'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/transports

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
providers/openai/src/lib.rs (1)

292-306: ⚠️ Potential issue | 🟠 Major

build() still requires Default for custom transports.

Line 292 narrows the builder to T: Transport + Default, so a caller can pass any T: Transport into with_transport() and then hit a compile error at .build() unless that transport also implements Default. That makes the new transport abstraction incomplete for non-defaultable transports. The same fix as Gemini applies here: initialize the default reqwest transport in new() and let build() require only T: Transport.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/openai/src/lib.rs` around lines 292 - 306, The build() impl
currently constrains T with Default (impl<U, C, T: Transport + Default>
OpenAIBuilder<WithModel, U, C, T>) which forces any custom transport passed via
with_transport() to implement Default; remove the Default bound from build() so
it becomes impl<U, C, T: Transport> OpenAIBuilder<WithModel, U, C, T> and keep
build() using the already-initialized transport option. Instead, ensure new()
initializes the builder.transport with a default ReqwestTransport (or the chosen
default) so a transport is always present unless explicitly overridden by
with_transport(); update references to OpenAIBuilder::new(), with_transport(),
and OpenAIClient construction to rely on the transport Option being set by new()
and not require T: Default in build().
🧹 Nitpick comments (1)
transports/reqwest/src/lib.rs (1)

78-113: Well-structured stream() with proper SSE parsing.

The implementation:

  • Validates HTTP status before streaming (returns error for non-2xx)
  • Uses SseParser from chat-core for incremental SSE parsing
  • Correctly pushes byte chunks and yields parsed events

One consideration for production hardening: the underlying reqwest::Client defaults have no explicit request/read timeouts. For long-running SSE streams, this is often intentional, but for the send() path, you may want to document or provide a way to configure timeouts via reqwest::ClientBuilder when constructing ReqwestTransport::from(client).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@transports/reqwest/src/lib.rs` around lines 78 - 113, The stream()
implementation is fine but we should allow configuring request/read timeouts on
the underlying reqwest client used in send(); update the ReqwestTransport
constructor (e.g., ReqwestTransport::from or the builder that creates the
reqwest::Client) to accept a reqwest::ClientBuilder or a timeout configuration
struct and apply appropriate timeout settings via ClientBuilder::timeout /
ClientBuilder::connect_timeout (or document intentional omission), then
construct the reqwest::Client with those settings so send() calls in stream()
use the configured timeouts; reference ReqwestTransport::from, the internal
client field, and the stream() -> send() call path when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/src/transport/traits.rs`:
- Line 21: The Transport trait currently requires Sync on its trait bound (pub
trait Transport: Send + Sync) which prevents implementations that use non-Sync
browser types; remove the Sync bound so it becomes pub trait Transport: Send and
rely on &mut self for exclusive access and the Send requirement on the futures
returned by Transport::send() and Transport::stream(); update any impls or
trait-object usages that assumed Transport: Sync and run cargo to fix resulting
type errors, keeping Transport, send(), and stream() signatures unchanged aside
from the trait bound removal.

In `@providers/claude/Cargo.toml`:
- Around line 17-18: Make the futures dependency optional and add it to the
existing stream feature: change the dependency named "futures" to be optional
(match how "async-stream" is declared) and include "futures" in the "stream"
feature alongside "chat-core/stream" and "async-stream" so the stream feature
gates all three dependencies.

In `@providers/claude/src/client.rs`:
- Around line 4-11: The ClaudeClient struct is missing the native_tools field;
add pub(crate) native_tools: Vec<Box<dyn ClaudeNativeTool>>, to the ClaudeClient
definition (same pattern used in GeminiClient and OpenAIClient), then update any
constructors/builders/factory functions that create ClaudeClient (e.g.,
new/with_* functions) to initialize native_tools (either from a parameter or as
Vec::new()), and update any Clone/Default/serialization implementations or
usages that construct or destructure ClaudeClient so they include this new
field; ensure ClaudeNativeTool is imported/available in the module.

In `@providers/gemini/Cargo.toml`:
- Line 12: Make the futures dependency optional and include it in the stream
feature to match chat-router: change the futures entry from an unconditional
workspace dependency to an optional workspace dependency (i.e., futures = {
workspace = true, optional = true }) and add "dep:futures" to the stream feature
list so stream = ["chat-core/stream", "dep:async-stream", "dep:futures"]; apply
the same updates to the chat-openai and chat-claude provider Cargo.toml files to
keep the pattern consistent.

In `@providers/gemini/src/lib.rs`:
- Around line 239-246: The impl for GeminiBuilder<WithModel, C, T> currently
requires T: Transport + Default which forces custom transports to implement
Default; change it so build only requires T: Transport (remove the Default
bound) and ensure new() seeds a default ReqwestTransport when no transport is
provided (so with_transport(MyTransport) compiles and build() can unwrap the
transport without requiring Default). Update the implementation references to
GeminiBuilder, build, new, with_transport, Transport, and the default seed to
ReqwestTransport accordingly.

In `@providers/openai/Cargo.toml`:
- Around line 18-19: The futures dependency is currently always enabled but must
be gated by the "stream" feature like chat-core/stream and async-stream; update
Cargo.toml so that futures is declared optional (e.g., futures = { workspace =
true, optional = true }) and then include it in the "stream" feature alongside
"chat-core/stream" and "async-stream" so that all three dependencies are enabled
only when the stream feature is activated.
- Line 12: The Cargo.toml currently sets transport-reqwest.workspace = true but
omits a direct reqwest dependency; add a direct dependency declaration for
reqwest including the "json" and "stream" features so the provider crate
explicitly depends on reqwest (keep version consistent with the workspace or
pinned to the workspace version), e.g., add reqwest with features
["json","stream"] in the same Cargo.toml where transport-reqwest.workspace is
set to true to satisfy provider compliance.

In `@providers/openai/src/api/completion.rs`:
- Around line 58-62: The transport error handling in
providers/openai/src/api/completion.rs currently maps every transport failure to
ChatError::Network, which makes non-transient TransportError::Request get
treated as retryable; change the mapping when awaiting
self.transport.send(req).await to preserve TransportError variants (e.g., map
TransportError::Request to a non-retryable ChatFailure via
ChatFailure::from_err(ChatError::Other or Provider) and map only actual
network-level failures to ChatError::Network), and mirror this same fix in the
Claude and Gemini completion handlers where transport.send is used. Also
implement ChatError::is_retryable() so it returns true only for
ChatError::RateLimited and ChatError::Network and false for Provider,
InvalidResponse, MaxStepsExceeded, Callback, and Other so the retry/router logic
behaves correctly.

In `@providers/openai/src/api/types/error.rs`:
- Around line 30-33: Change the OpenAI error detail's code field from
Option<i32> to Option<String> in the OpenAIErrorDetail struct so JSON
deserialization accepts string codes; then update the error formatting in the
OpenAIErrorResponse handling (the code that builds error_msg and currently uses
openai_err.error.code.unwrap_or(status as i32)) to handle Option<String> (e.g.,
map or unwrap_or to a string fallback such as status.to_string() or a
descriptive fallback). Ensure references to OpenAIErrorDetail,
OpenAIErrorResponse, and the error_msg construction are updated consistently.

---

Outside diff comments:
In `@providers/openai/src/lib.rs`:
- Around line 292-306: The build() impl currently constrains T with Default
(impl<U, C, T: Transport + Default> OpenAIBuilder<WithModel, U, C, T>) which
forces any custom transport passed via with_transport() to implement Default;
remove the Default bound from build() so it becomes impl<U, C, T: Transport>
OpenAIBuilder<WithModel, U, C, T> and keep build() using the already-initialized
transport option. Instead, ensure new() initializes the builder.transport with a
default ReqwestTransport (or the chosen default) so a transport is always
present unless explicitly overridden by with_transport(); update references to
OpenAIBuilder::new(), with_transport(), and OpenAIClient construction to rely on
the transport Option being set by new() and not require T: Default in build().

---

Nitpick comments:
In `@transports/reqwest/src/lib.rs`:
- Around line 78-113: The stream() implementation is fine but we should allow
configuring request/read timeouts on the underlying reqwest client used in
send(); update the ReqwestTransport constructor (e.g., ReqwestTransport::from or
the builder that creates the reqwest::Client) to accept a reqwest::ClientBuilder
or a timeout configuration struct and apply appropriate timeout settings via
ClientBuilder::timeout / ClientBuilder::connect_timeout (or document intentional
omission), then construct the reqwest::Client with those settings so send()
calls in stream() use the configured timeouts; reference ReqwestTransport::from,
the internal client field, and the stream() -> send() call path when making the
change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: abf3a0f7-5621-4467-9f1b-c9c9a3b94a9d

📥 Commits

Reviewing files that changed from the base of the PR and between ba924bd and ffc6261.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (34)
  • Cargo.toml
  • ROADMAP.md
  • core/Cargo.toml
  • core/src/lib.rs
  • core/src/traits.rs
  • core/src/transport/mod.rs
  • core/src/transport/sse.rs
  • core/src/transport/traits.rs
  • core/src/transport/types.rs
  • core/src/types/response.rs
  • examples/router/embeddings.rs
  • providers/claude/Cargo.toml
  • providers/claude/src/api/completion.rs
  • providers/claude/src/api/stream.rs
  • providers/claude/src/api/types/error.rs
  • providers/claude/src/client.rs
  • providers/claude/src/lib.rs
  • providers/gemini/Cargo.toml
  • providers/gemini/src/api/completion.rs
  • providers/gemini/src/api/embedding.rs
  • providers/gemini/src/api/stream.rs
  • providers/gemini/src/api/types/error.rs
  • providers/gemini/src/client.rs
  • providers/gemini/src/lib.rs
  • providers/openai/Cargo.toml
  • providers/openai/src/api/completion.rs
  • providers/openai/src/api/embedding.rs
  • providers/openai/src/api/stream.rs
  • providers/openai/src/api/types/error.rs
  • providers/openai/src/client.rs
  • providers/openai/src/lib.rs
  • src/lib.rs
  • transports/reqwest/Cargo.toml
  • transports/reqwest/src/lib.rs

Comment thread core/src/transport/traits.rs
Comment thread providers/claude/Cargo.toml Outdated
Comment thread providers/claude/src/client.rs
Comment thread providers/gemini/Cargo.toml
Comment thread providers/gemini/src/lib.rs Outdated
Comment thread providers/openai/Cargo.toml
Comment thread providers/openai/Cargo.toml Outdated
Comment thread providers/openai/src/api/completion.rs Outdated
Comment thread providers/openai/src/api/types/error.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
providers/claude/src/api/stream.rs (1)

158-176: Terminate on message_stop instead of waiting for socket close.

Now that transport already hands this parser discrete Claude events, the stream can finish as soon as message_stop arrives. Waiting for EOF before yielding Done keeps callers blocked longer than necessary and diverges from the provider's streaming contract.

Based on learnings: Emit StreamEvent::Done with full ChatResponse on message_stop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/claude/src/api/stream.rs` around lines 158 - 176, The parser
parse_claude_event_stream currently waits for EOF before yielding a final
StreamEvent::Done; change it to terminate and emit StreamEvent::Done as soon as
a `message_stop` event is observed: inside the event loop that processes
events.next(), detect the `message_stop` event_type, build the full ChatResponse
using the accumulated parts (final_parts, final_reason, message_id, model,
input_tokens, output_tokens, tool_input_buffer), yield StreamEvent::Done with
that ChatResponse, and then break/return from the stream so callers are not
blocked waiting for socket close; ensure other cleanup/metrics still run but do
not defer emitting Done until EOF.
providers/openai/src/api/stream.rs (1)

281-297: Emit Done from response.completed, not only at EOF.

StreamState::on_completed() already has enough data to build the final ChatResponse, but this parser waits for the transport to close before yielding StreamEvent::Done. That adds avoidable tail latency and can stall completion if the server leaves the SSE connection open briefly after the final event.

Based on learnings: Emit Done(ChatResponse) on response.completed event in streaming.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/openai/src/api/stream.rs` around lines 281 - 297, The parser
currently only yields StreamEvent::Done at EOF; change
parse_transport_event_stream to also emit Done when a response.completed event
is observed by detecting the "response.completed" event_type (or the
corresponding signal from StreamState::handle_event) and immediately calling
StreamState::on_completed()/into_response() to yield
StreamEvent::Done(ChatResponse) instead of waiting for the transport to close;
update the loop in parse_transport_event_stream to check event_type (or the
handle_event result) for completion and yield Done right away using
state.on_completed()/state.into_response().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/src/error.rs`:
- Around line 48-55: The current From<TransportError> for ChatError
implementation drops HTTP semantics causing streamed 429s to become
non-retryable Provider errors; update the conversion so TransportError::Request
preserves HTTP status (or parses the status from the error string) and maps 429
-> ChatError::RateLimited while other request errors remain ChatError::Provider
(or include structured status/body if refactoring TransportError). Also add
ChatError::is_retryable() method that returns true only for
ChatError::RateLimited and ChatError::Network and false for Provider,
InvalidResponse, MaxStepsExceeded, Callback, and Other to ensure router/retry
logic treats rate limits and network failures as retryable.

---

Nitpick comments:
In `@providers/claude/src/api/stream.rs`:
- Around line 158-176: The parser parse_claude_event_stream currently waits for
EOF before yielding a final StreamEvent::Done; change it to terminate and emit
StreamEvent::Done as soon as a `message_stop` event is observed: inside the
event loop that processes events.next(), detect the `message_stop` event_type,
build the full ChatResponse using the accumulated parts (final_parts,
final_reason, message_id, model, input_tokens, output_tokens,
tool_input_buffer), yield StreamEvent::Done with that ChatResponse, and then
break/return from the stream so callers are not blocked waiting for socket
close; ensure other cleanup/metrics still run but do not defer emitting Done
until EOF.

In `@providers/openai/src/api/stream.rs`:
- Around line 281-297: The parser currently only yields StreamEvent::Done at
EOF; change parse_transport_event_stream to also emit Done when a
response.completed event is observed by detecting the "response.completed"
event_type (or the corresponding signal from StreamState::handle_event) and
immediately calling StreamState::on_completed()/into_response() to yield
StreamEvent::Done(ChatResponse) instead of waiting for the transport to close;
update the loop in parse_transport_event_stream to check event_type (or the
handle_event result) for completion and yield Done right away using
state.on_completed()/state.into_response().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e2822889-687d-4a81-a5fc-f804fa111dec

📥 Commits

Reviewing files that changed from the base of the PR and between ffc6261 and 741193f.

📒 Files selected for processing (16)
  • core/src/error.rs
  • providers/claude/Cargo.toml
  • providers/claude/src/api/completion.rs
  • providers/claude/src/api/stream.rs
  • providers/claude/src/lib.rs
  • providers/gemini/Cargo.toml
  • providers/gemini/src/api/completion.rs
  • providers/gemini/src/api/embedding.rs
  • providers/gemini/src/api/stream.rs
  • providers/gemini/src/lib.rs
  • providers/openai/Cargo.toml
  • providers/openai/src/api/completion.rs
  • providers/openai/src/api/embedding.rs
  • providers/openai/src/api/stream.rs
  • providers/openai/src/api/types/error.rs
  • providers/openai/src/lib.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • providers/claude/Cargo.toml
  • providers/openai/Cargo.toml
  • providers/openai/src/api/types/error.rs
  • providers/gemini/src/api/completion.rs
  • providers/openai/src/api/embedding.rs
  • providers/claude/src/api/completion.rs
  • providers/claude/src/lib.rs

Comment thread core/src/error.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
providers/openai/src/api/stream.rs (2)

163-175: ⚠️ Potential issue | 🟠 Major

Emit StreamEvent::Done when response.completed is processed, not on EOF.

Currently on_completed() returns Ok(None) (line 265), so the final response is only yielded when the transport closes (line 295). This means a failed or truncated stream can still appear successful if the connection closes cleanly after an unhandled event. Per the OpenAI Responses API, response.completed is the normal terminal event—emit Done(ChatResponse) there and return an error if the stream ends without seeing it.

Current behavior
    fn on_completed(&mut self, data: &str) -> Result<Option<StreamEvent>, ChatError> {
        // ... updates state ...
        Ok(None)  // Never emits Done
    }

fn parse_transport_event_stream(...) -> BoxStream<'static, ...> {
    let stream = try_stream! {
        while let Some(event_res) = events.next().await {
            // handle events...
        }
        yield StreamEvent::Done(state.into_response());  // Done only on EOF
    };
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/openai/src/api/stream.rs` around lines 163 - 175, The handler
currently ignores the terminal `response.completed` event (on_completed returns
Ok(None)) and only yields StreamEvent::Done when the transport closes; change
on_completed(&mut self, data: &str) to build the final ChatResponse from the
current state and return Ok(Some(StreamEvent::Done(response))) so the terminal
event is emitted immediately in handle_event, and update
parse_transport_event_stream (the EOF path) to return an error if EOF occurs
without having received the completed event rather than yielding
Done(state.into_response()); ensure references to StreamEvent::Done,
on_completed, handle_event, and parse_transport_event_stream are used so the
final response is produced on the completed event and EOF without completion is
treated as an error.

193-200: ⚠️ Potential issue | 🟠 Major

Tool-call deltas are keyed to the wrong identifier.

The OpenAI Responses API distinguishes between item_id (the function call item's unique ID) and call_id (the function call instance ID). Line 195 seeds fc_args with call_id, but streaming argument deltas arrive with item_id (line 221). The lookup fails, falling back to .values_mut().last() (line 224), which is order-dependent and unreliable when multiple tool calls are active. Additionally, lines 196–200 emit StreamEvent::ToolCall with arguments: Value::Null, and the accumulated arguments in fc_args are never read back into the emitted event.

To fix: either capture and key by the item's id field from response.output_item.added, or map deltas by item_id and update previously emitted tool calls when arguments arrive. The proper approach per the Responses API is to correlate deltas with the output item's id field rather than call_id.

Also applies to: 217–227

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/openai/src/api/stream.rs` around lines 193 - 200, The code seeds
and keys fc_args using ResponsesOutputItem::FunctionCall's call_id (fc.call_id)
but streaming argument deltas are correlated by the output item's id
(added.item.id / item_id), so lookups fail and emitted StreamEvent::ToolCall
never contains accumulated arguments; change the keying to use the output item
id from response.output_item.added (use added.item.id / item_id) when inserting
into self.fc_args, update the logic that applies deltas (currently matching on
item_id) to mutate the same fc_args entry, and when emitting
StreamEvent::ToolCall in the block that creates the initial call (function
producing FunctionCall events) populate the arguments field from the current
fc_args value (or Value::Null if empty) so later deltas update the same map and
the event reflects accumulated arguments; apply the same fix to the other
similar block around the 217–227 region.
core/AGENTS.md (1)

7-35: ⚠️ Potential issue | 🟡 Minor

Add a language to this fenced block.

markdownlint is already flagging this fence with MD040. Using text here is enough.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/AGENTS.md` around lines 7 - 35, The fenced code block showing the crate
file tree is missing a language tag which triggers markdownlint MD040; update
that specific fenced block (the one containing the src/ tree and filenames like
lib.rs, traits.rs, transport/mod.rs, chat/mod.rs, types/response.rs) by changing
the opening triple backticks to include a language token (use "text") so the
fence becomes ```text.
providers/claude/src/api/stream.rs (1)

174-228: ⚠️ Potential issue | 🟠 Major

Fail fast on malformed Claude transport events.

After the transport refactor, this loop is the last place that can classify bad Claude payloads. The if let Ok(...) guards on Lines 180-201 and Lines 209-224 drop both JSON decode failures and sse_event_to_part() errors, so a malformed chunk is silently skipped and the stream can end with truncated final_parts or a stale tool_input_buffer.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/claude/src/api/stream.rs` around lines 174 - 228, This loop
currently swallows JSON parse errors and sse_event_to_part() errors (in the
message_start, content_block_start and content_block_delta branches) which leads
to truncated final_parts or stale tool_input_buffer; replace the `if let
Ok(...)` guards with fallible binds that propagate errors instead of skipping:
e.g., use `let data =
serde_json::from_str::<Value>(&json_str).map_err(ChatError::from)?` where you
need the parsed JSON, and similarly `let part_opt = sse_event_to_part("...",
&json_str).map_err(ChatError::from)?` before using
`final_parts.merge_chunk(part)` or mutating `tool_input_buffer`, so any
malformed Claude transport event returns an error immediately rather than being
ignored.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@providers/openai/src/lib.rs`:
- Around line 141-150: with_custom_url currently parses the full URL but
discards the scheme, causing http vs https endpoints to be treated identically;
update with_custom_url to preserve the URL scheme (e.g., store "http" or
"https") along with host and base_path in the OpenAIBuilder so the transport can
reconstruct the full request URL correctly — modify the function that builds
OpenAIBuilder (with_custom_url) to extract parsed.scheme() and include it in the
builder struct (add a scheme field if missing) and ensure existing references
(host, base_path, OpenAIBuilder) are updated to use that scheme when the request
URL is reconstructed.

---

Outside diff comments:
In `@core/AGENTS.md`:
- Around line 7-35: The fenced code block showing the crate file tree is missing
a language tag which triggers markdownlint MD040; update that specific fenced
block (the one containing the src/ tree and filenames like lib.rs, traits.rs,
transport/mod.rs, chat/mod.rs, types/response.rs) by changing the opening triple
backticks to include a language token (use "text") so the fence becomes ```text.

In `@providers/claude/src/api/stream.rs`:
- Around line 174-228: This loop currently swallows JSON parse errors and
sse_event_to_part() errors (in the message_start, content_block_start and
content_block_delta branches) which leads to truncated final_parts or stale
tool_input_buffer; replace the `if let Ok(...)` guards with fallible binds that
propagate errors instead of skipping: e.g., use `let data =
serde_json::from_str::<Value>(&json_str).map_err(ChatError::from)?` where you
need the parsed JSON, and similarly `let part_opt = sse_event_to_part("...",
&json_str).map_err(ChatError::from)?` before using
`final_parts.merge_chunk(part)` or mutating `tool_input_buffer`, so any
malformed Claude transport event returns an error immediately rather than being
ignored.

In `@providers/openai/src/api/stream.rs`:
- Around line 163-175: The handler currently ignores the terminal
`response.completed` event (on_completed returns Ok(None)) and only yields
StreamEvent::Done when the transport closes; change on_completed(&mut self,
data: &str) to build the final ChatResponse from the current state and return
Ok(Some(StreamEvent::Done(response))) so the terminal event is emitted
immediately in handle_event, and update parse_transport_event_stream (the EOF
path) to return an error if EOF occurs without having received the completed
event rather than yielding Done(state.into_response()); ensure references to
StreamEvent::Done, on_completed, handle_event, and parse_transport_event_stream
are used so the final response is produced on the completed event and EOF
without completion is treated as an error.
- Around line 193-200: The code seeds and keys fc_args using
ResponsesOutputItem::FunctionCall's call_id (fc.call_id) but streaming argument
deltas are correlated by the output item's id (added.item.id / item_id), so
lookups fail and emitted StreamEvent::ToolCall never contains accumulated
arguments; change the keying to use the output item id from
response.output_item.added (use added.item.id / item_id) when inserting into
self.fc_args, update the logic that applies deltas (currently matching on
item_id) to mutate the same fc_args entry, and when emitting
StreamEvent::ToolCall in the block that creates the initial call (function
producing FunctionCall events) populate the arguments field from the current
fc_args value (or Value::Null if empty) so later deltas update the same map and
the event reflects accumulated arguments; apply the same fix to the other
similar block around the 217–227 region.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d9fb81ee-14ad-40b3-b12d-a7a9c2b4c788

📥 Commits

Reviewing files that changed from the base of the PR and between 741193f and 9465394.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (24)
  • README.md
  • core/AGENTS.md
  • core/src/error.rs
  • core/src/transport/types.rs
  • providers/AGENTS.md
  • providers/claude/README.md
  • providers/claude/src/api/completion.rs
  • providers/claude/src/api/stream.rs
  • providers/claude/src/client.rs
  • providers/claude/src/lib.rs
  • providers/gemini/README.md
  • providers/gemini/src/api/completion.rs
  • providers/gemini/src/api/embedding.rs
  • providers/gemini/src/api/stream.rs
  • providers/gemini/src/client.rs
  • providers/gemini/src/lib.rs
  • providers/openai/Cargo.toml
  • providers/openai/README.md
  • providers/openai/src/api/completion.rs
  • providers/openai/src/api/embedding.rs
  • providers/openai/src/api/stream.rs
  • providers/openai/src/client.rs
  • providers/openai/src/lib.rs
  • transports/reqwest/src/lib.rs
✅ Files skipped from review due to trivial changes (4)
  • providers/gemini/README.md
  • providers/openai/README.md
  • providers/claude/README.md
  • README.md
🚧 Files skipped from review as they are similar to previous changes (4)
  • providers/openai/Cargo.toml
  • providers/openai/src/client.rs
  • providers/claude/src/client.rs
  • transports/reqwest/src/lib.rs

Comment thread providers/openai/src/lib.rs
@EggerMarc EggerMarc merged commit 47b70be into main Apr 12, 2026
2 checks passed
@EggerMarc EggerMarc deleted the feat/transports branch April 13, 2026 00:07
@coderabbitai coderabbitai Bot mentioned this pull request Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant