Skip to content

remove duplicate RUN_FINISHED events#4789

Draft
jiapingzeng wants to merge 2 commits intoopensearch-project:mainfrom
jiapingzeng:remove-duplicate-agui-event
Draft

remove duplicate RUN_FINISHED events#4789
jiapingzeng wants to merge 2 commits intoopensearch-project:mainfrom
jiapingzeng:remove-duplicate-agui-event

Conversation

@jiapingzeng
Copy link
Copy Markdown
Contributor

Description

remove duplicate RUN_FINISHED events

Related Issues

Resolves #4736

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 10, 2026

PR Reviewer Guide 🔍

(Review updated until commit 82243dd)

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Remove RUN_FINISHED emission from agent/streaming layers

Relevant files:

  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/StreamingWrapper.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/HttpStreamingHandler.java

Sub-PR theme: Centralize RUN_FINISHED emission in RestMLExecuteStreamAction with AGUIEventResult

Relevant files:

  • plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java

⚡ Recommended focus areas for review

Mutable Parameter

In convertToAGUIEvent, the isLast parameter is reassigned inside the method body (when a run error occurs), but the updated value is used to construct the AGUIEventResult chunk. However, the hasRunError flag is returned separately and the caller uses contentResult.hasRunError() to decide whether to emit RUN_FINISHED. If isLast is set to true inside the method due to an error but the caller's isLast is still false, the final chunk flag on the content chunk and the outer chunk may be inconsistent. This could result in the stream not being properly closed.

boolean hasRunError = false;

if (content != null && !content.isEmpty()) {
    log.debug("RestMLExecuteStreamAction: Processing content: '{}'", content);

    try {
        if (StringUtils.isJson(content)) {
            JsonElement element = JsonParser.parseString(content);
            sseResponse.append("data: ").append(element).append("\n\n");
            log.debug("RestMLExecuteStreamAction: Processing json element: '{}'", element);
        } else {
            log.warn("Unexpected content received - not valid JSON: {}", content);
            BaseEvent runErrorEvent = new RunErrorEvent("Unexpected chunk: " + content, null);
            sseResponse.append("data: ").append(runErrorEvent.toJsonString()).append("\n\n");
            isLast = true;
            hasRunError = true;
        }
    } catch (Exception e) {
        log.error("Failed to process AG-UI events chunk content {}", content, e);
        BaseEvent runErrorEvent = new RunErrorEvent("Unexpected error: " + e.getMessage(), null);
        sseResponse.append("data: ").append(runErrorEvent.toJsonString()).append("\n\n");
        isLast = true;
        hasRunError = true;
    }
} else {
    log.warn("Received null or empty AG-UI content chunk");
}

String finalSse = sseResponse.toString();
log.debug("RestMLExecuteStreamAction: Returning chunk - length={}", finalSse.length());
return new AGUIEventResult(createHttpChunk(finalSse, isLast), hasRunError);
Missing RUN_FINISHED

When isLast is true and hasRunError is true, the RUN_FINISHED event is skipped. However, the AG-UI protocol may require a terminal event even after a RUN_ERROR. Verify that the frontend correctly handles a stream that ends with only a RUN_ERROR and no RUN_FINISHED, to avoid hanging or incomplete state on the client side.

if (isLast && !contentResult.hasRunError()) {
    BaseEvent runFinishedEvent = new RunFinishedEvent(threadId, runId, null);
    combinedSse.append("data: ").append(runFinishedEvent.toJsonString()).append("\n\n");
}
Silent Failure

The sendRunFinishedAndCloseStream method now only calls sendCompletionChunk without any error handling. If sendCompletionChunk throws an exception, it will propagate uncaught. The previous implementation had a try-catch block. Ensure that sendCompletionChunk handles its own exceptions or add error handling here.

public void sendRunFinishedAndCloseStream(String sessionId, String parentInteractionId) {
    // Send an empty completion chunk with is_last=true.
    // RestMLExecuteStreamAction will emit RUN_FINISHED when it sees the final chunk.
    sendCompletionChunk(sessionId, parentInteractionId);
}

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 82243dd

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 10, 2026

PR Code Suggestions ✨

Latest suggestions up to 82243dd
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Specify charset when converting bytes to string

BytesReference.toBytes() and new String(...) without specifying a charset may
produce incorrect results on platforms where the default charset is not UTF-8. Since
SSE data is expected to be UTF-8, the charset should be explicitly specified to
avoid potential encoding issues.

plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java [490]

-combinedSse.append(new String(BytesReference.toBytes(contentResult.chunk().content())));
+combinedSse.append(new String(BytesReference.toBytes(contentResult.chunk().content()), java.nio.charset.StandardCharsets.UTF_8));
Suggestion importance[1-10]: 5

__

Why: Explicitly specifying StandardCharsets.UTF_8 when converting bytes to a String is a valid best practice to avoid platform-dependent encoding issues with SSE data. However, this is a minor style/safety improvement with low practical impact in most JVM environments.

Low
Clarify terminal event handling on error

The isLast parameter is a method parameter being reassigned inside the method, but
this local reassignment doesn't affect the caller's logic for determining if it's
the last chunk. Since hasRunError is now returned via AGUIEventResult, the isLast =
true assignment inside convertToAGUIEvent is redundant and misleading — the isLast
flag passed to createHttpChunk will mark the chunk as last, but the outer
convertToHttpChunk caller still controls whether RUN_FINISHED is emitted based on
its own isLast and contentResult.hasRunError(). This could cause premature stream
termination on error chunks that aren't truly the last chunk from the caller's
perspective.

plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java [601-602]

-isLast = true;
 hasRunError = true;
+isLast = true; // Mark as last since RUN_ERROR is a terminal event
Suggestion importance[1-10]: 2

__

Why: The suggestion only reorders hasRunError = true and isLast = true and adds a comment, without changing any logic. The isLast = true assignment is intentional to mark the chunk as terminal when a RUN_ERROR occurs, and the hasRunError flag correctly prevents RUN_FINISHED from being emitted in the caller. The improved code is functionally identical to the existing code.

Low

Previous suggestions

Suggestions up to commit 82243dd
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure stream closes properly on run error

When contentResult.hasRunError() is true, the stream is closed via the inner chunk
(due to isLast = true inside convertToAGUIEvent), but the outer isLast flag may
still be false, meaning the outer chunk won't be marked as the last chunk. This
could leave the stream in an inconsistent state. Ensure the outer chunk is also
marked as last when a run error occurs.

plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java [494-497]

-if (isLast && !contentResult.hasRunError()) {
+boolean effectiveIsLast = isLast || contentResult.hasRunError();
+if (effectiveIsLast && !contentResult.hasRunError()) {
     BaseEvent runFinishedEvent = new RunFinishedEvent(threadId, runId, null);
     combinedSse.append("data: ").append(runFinishedEvent.toJsonString()).append("\n\n");
 }
+// use effectiveIsLast when creating the combined chunk
Suggestion importance[1-10]: 5

__

Why: This is a valid concern — when hasRunError is true, the inner chunk is marked as last but the outer combined chunk may not be, potentially leaving the stream open. However, the improved_code only partially addresses this with a comment placeholder for the combined chunk creation, making the fix incomplete.

Low
General
Avoid reassigning method parameter for clarity

The isLast parameter is a method parameter being reassigned locally, but since
convertToAGUIEvent is now called with false for isLast in the combined SSE path,
mutating it has no effect on the outer isLast check. However, setting isLast = true
inside the method still affects createHttpChunk(finalSse, isLast) at the end, which
may cause the stream to close prematurely on error before the outer caller has a
chance to emit a RUN_FINISHED or handle cleanup. Verify this is the intended
behavior and consider using a separate local variable instead of reassigning the
parameter to make the intent clearer.

plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java [601-602]

-isLast = true;
+boolean forceClose = true;
 hasRunError = true;
+// use forceClose instead of isLast when calling createHttpChunk
Suggestion importance[1-10]: 3

__

Why: The suggestion raises a valid concern about reassigning the isLast parameter, but the behavior is intentional — when an error occurs, isLast=true ensures createHttpChunk marks the chunk as final. The improved_code introduces a comment placeholder rather than a complete solution, making it incomplete and harder to evaluate as a concrete fix.

Low

@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval April 10, 2026 00:33 — with GitHub Actions Error
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval April 10, 2026 00:33 — with GitHub Actions Error
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval April 10, 2026 00:33 — with GitHub Actions Failure
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval April 10, 2026 00:33 — with GitHub Actions Failure
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] AG-UI agent returning extra event when using OpenAI as model provider

1 participant