Skip to content

Propagate stream error#4792

Open
jiapingzeng wants to merge 3 commits intoopensearch-project:mainfrom
jiapingzeng:propagate-stream-error
Open

Propagate stream error#4792
jiapingzeng wants to merge 3 commits intoopensearch-project:mainfrom
jiapingzeng:propagate-stream-error

Conversation

@jiapingzeng
Copy link
Copy Markdown
Contributor

Description

Propagate stream error.

Tested that when there is an underlying error (e.g. agentic memory error), the error is propagated properly.

data: {"type":"RUN_STARTED","timestamp":1776197477955,"threadId":"6930a356-dd3b-4620-8b5f-d41512d10453","runId":"ec330a5e-e247-46e3-bc27-dd920cf2b6af"}

data: {"type":"RUN_ERROR","timestamp":1776197478098,"message":"Memory container not found"}

Related Issues

Resolves #4749

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Unwrap TransportException root cause before sending response

Relevant files:

  • plugin/src/main/java/org/opensearch/ml/action/execute/TransportExecuteStreamTaskAction.java
  • plugin/src/test/java/org/opensearch/ml/action/execute/TransportExecuteStreamTaskActionTests.java

Sub-PR theme: Build structured stream error chunk with AG-UI support

Relevant files:

  • plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java
  • plugin/src/test/java/org/opensearch/ml/rest/RestMLExecuteStreamActionTests.java

⚡ Recommended focus areas for review

Partial Unwrap

In buildStreamErrorChunk, the unwrap logic only goes one level deep (ex.getCause()). If the exception chain is deeper (e.g., TransportException wrapping RuntimeException wrapping OpenSearchStatusException), the OpenSearchStatusException branch will be missed and the fallback message may be less informative. Consider recursively unwrapping to the root cause.

Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
String errorMessage;
if (ex instanceof IOException) {
    errorMessage = "Failed to parse request: " + ex.getMessage();
} else if (cause instanceof OpenSearchStatusException statusEx) {
    errorMessage = statusEx.getMessage();
} else {
    errorMessage = cause.getMessage() != null ? cause.getMessage() : ex.getMessage();
}
IOException Check Order

In buildStreamErrorChunk, the IOException check uses ex (the original exception) but the OpenSearchStatusException check uses cause (the unwrapped exception). If the cause is an IOException, it will not be caught by the first branch. The ordering and which variable is checked should be made consistent and intentional.

if (ex instanceof IOException) {
    errorMessage = "Failed to parse request: " + ex.getMessage();
} else if (cause instanceof OpenSearchStatusException statusEx) {
    errorMessage = statusEx.getMessage();
} else {
    errorMessage = cause.getMessage() != null ? cause.getMessage() : ex.getMessage();
}
Cast Safety

exp.unwrapCause() returns a Throwable, and it is cast directly to Exception. If the root cause is an Error (e.g., OutOfMemoryError), this cast will throw a ClassCastException at runtime. Consider checking the type before casting or using a safer approach.

channel.sendResponse((Exception) exp.unwrapCause());

@jiapingzeng jiapingzeng requested a deployment to ml-commons-cicd-env-require-approval April 14, 2026 20:14 — with GitHub Actions Waiting
@jiapingzeng jiapingzeng requested a deployment to ml-commons-cicd-env-require-approval April 14, 2026 20:14 — with GitHub Actions Waiting
@jiapingzeng jiapingzeng requested a deployment to ml-commons-cicd-env-require-approval April 14, 2026 20:14 — with GitHub Actions Waiting
@jiapingzeng jiapingzeng requested a deployment to ml-commons-cicd-env-require-approval April 14, 2026 20:14 — with GitHub Actions Waiting
@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against unsafe cast of unwrapped cause

exp.unwrapCause() returns a Throwable, not necessarily an Exception. If the root
cause is an Error or another non-Exception Throwable, this cast will throw a
ClassCastException at runtime. You should handle this case safely by wrapping
non-Exception throwables.

plugin/src/main/java/org/opensearch/ml/action/execute/TransportExecuteStreamTaskAction.java [94]

-channel.sendResponse((Exception) exp.unwrapCause());
+Throwable cause = exp.unwrapCause();
+channel.sendResponse(cause instanceof Exception ? (Exception) cause : new RuntimeException(cause));
Suggestion importance[1-10]: 7

__

Why: exp.unwrapCause() returns a Throwable, and casting it directly to Exception could throw a ClassCastException if the root cause is an Error. The suggested fix safely wraps non-Exception throwables in a RuntimeException, which is a valid defensive improvement.

Medium
General
Fix incomplete JSON string escaping

The manual JSON escaping using replace(""", "\"") is incomplete and fragile — it
doesn't handle other special characters like backslashes, newlines, or control
characters, which could produce malformed JSON. Consider using a proper JSON
serialization approach to safely encode the error message.

plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java [645]

-return createHttpChunk("data: {\"error\": \"" + errorMessage.replace("\"", "\\\"") + "\"}\n\n", true);
+String safeMessage = errorMessage.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "\\r");
+return createHttpChunk("data: {\"error\": \"" + safeMessage + "\"}\n\n", true);
Suggestion importance[1-10]: 4

__

Why: The manual JSON escaping in buildStreamErrorChunk is indeed incomplete and could produce malformed JSON for messages containing backslashes or newlines. However, this is a pre-existing pattern and the improvement is moderate in impact.

Low
Fix exception type check on wrapped cause

The IOException check uses ex (the wrapper) rather than cause, so if an IOException
is wrapped inside another exception, it won't be detected. Additionally, the
unwrapping only goes one level deep — if the OpenSearchStatusException is nested
more than one level, it won't be found. Consider using a recursive unwrap or
checking cause for IOException as well.

plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java [631-639]

 Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
 String errorMessage;
-if (ex instanceof IOException) {
-    errorMessage = "Failed to parse request: " + ex.getMessage();
+if (ex instanceof IOException || cause instanceof IOException) {
+    Throwable ioEx = ex instanceof IOException ? ex : cause;
+    errorMessage = "Failed to parse request: " + ioEx.getMessage();
 } else if (cause instanceof OpenSearchStatusException statusEx) {
     errorMessage = statusEx.getMessage();
 } else {
     errorMessage = cause.getMessage() != null ? cause.getMessage() : ex.getMessage();
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that the IOException check only looks at the top-level exception and misses wrapped IOException instances. However, this is a minor edge case and the single-level unwrap covers the most common scenarios in this context.

Low

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] stream errors not propagated properly

1 participant