Skip to content

Support messages array in all memory types + chat history in AGUI agent#4645

Merged
rithinpullela merged 16 commits intoopensearch-project:mainfrom
jiapingzeng:extend-memory-interface
Feb 27, 2026
Merged

Support messages array in all memory types + chat history in AGUI agent#4645
rithinpullela merged 16 commits intoopensearch-project:mainfrom
jiapingzeng:extend-memory-interface

Conversation

@jiapingzeng
Copy link
Copy Markdown
Contributor

@jiapingzeng jiapingzeng commented Feb 14, 2026

Description

As a part of #4552, we introduced support for messages array as input for agent execute. However, in the initial implementation, we only stored plain text in memory. Multimodal messages and tool use/results are dropped. This is inline with our current implementation of the chat agent where a new request would only have access to initial user question and final assistant response from the previous request.

With this PR, we will also store multimodal messages and tool results from previous requests so that new requests will have full context. In addition, this PR moves memory storing logic from MLAgentExecutor to the memory layer by introducing new methods in the memory interface that handles messages array, so that each memory type can handle memory according to its capabilities. i.e. ConversationIndexMemory will still only store text messages whereas AgenticConversationMemory and RemoteAgenticMemory will store all messages.

Sample request/response:

Agent here is a conversational agent with agentic memory.

POST {{endpoint}}/_plugins/_ml/agents/{{agent_id}}/_execute
{
  "input": [
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": "Hello, my name is Alice."
        }
      ]
    }
  ]
}

{
  "inference_results": [
    {
      "output": [
        {
          "name": "memory_id",
          "result": "NT-ccpwBA0d9txqBwBRJ"
        },
        {
          "name": "parent_interaction_id",
          "result": "Nj-ccpwBA0d9txqBwBSO"
        },
        {
          "name": "response",
          "dataAsMap": {
            "additional_info": {},
            "response": "Hello Alice! It's nice to meet you. How can I help you today? \n\nIf you have any questions about OpenSearch indices or need to search for data in your cluster, feel free to ask!"
          }
        }
      ]
    }
  ]
} 

Checking memory:

GET {{endpoint}}/_plugins/_ml/memory_containers/{{memory_container_id}}/memories/working/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "namespace.session_id": "NT-ccpwBA0d9txqBwBRJ"
          }
        },
        {
          "term": {
            "metadata.type": "structured_message"
          }
        }
      ]
    }
  },
  "size": 10,
  "sort": [
    {
      "created_time": "asc"
    }
  ]
}

{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": null,
    "hits": [
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "Nz-ccpwBA0d9txqBwBSu",
        "_version": 1,
        "_seq_no": 29,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449401518,
          "metadata": {
            "role": "user",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "user",
              "content": [
                {
                  "text": "Hello, my name is Alice.",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449401518,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449401518
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "OT-ccpwBA0d9txqByhQk",
        "_version": 1,
        "_seq_no": 30,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449403940,
          "metadata": {
            "role": "assistant",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "assistant",
              "content": [
                {
                  "text": "Hello Alice! It's nice to meet you. How can I help you today? \n\nIf you have any questions about OpenSearch indices or need to search for data in your cluster, feel free to ask!",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449403940,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449403940
        ]
      }
    ]
  }
}

Follow-up question:

POST {{endpoint}}/_plugins/_ml/agents/{{agent_id}}/_execute
{
  "parameters": {
    "memory_id": "NT-ccpwBA0d9txqBwBRJ",
    "memory_container_id": "{{memory_container_id}}"
  },
  "input": [
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": "What is my name?"
        }
      ]
    }
  ]
}

{
  "inference_results": [
    {
      "output": [
        {
          "name": "memory_id",
          "result": "NT-ccpwBA0d9txqBwBRJ"
        },
        {
          "name": "parent_interaction_id",
          "result": "Pj-hcpwBA0d9txqBHxQD"
        },
        {
          "name": "response",
          "dataAsMap": {
            "additional_info": {},
            "response": "Your name is Alice, as you mentioned in your introduction. How can I assist you today?"
          }
        }
      ]
    }
  ]
}

Check memory again:

POST {{endpoint}}/_plugins/_ml/memory_containers/{{memory_container_id}}/memories/working/_search
{ /* same request body as previous memory check */ }

{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 4,
      "relation": "eq"
    },
    "max_score": null,
    "hits": [
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "Nz-ccpwBA0d9txqBwBSu",
        "_version": 1,
        "_seq_no": 29,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449401518,
          "metadata": {
            "role": "user",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "user",
              "content": [
                {
                  "text": "Hello, my name is Alice.",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449401518,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449401518
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "OT-ccpwBA0d9txqByhQk",
        "_version": 1,
        "_seq_no": 30,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449403940,
          "metadata": {
            "role": "assistant",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "assistant",
              "content": [
                {
                  "text": "Hello Alice! It's nice to meet you. How can I help you today? \n\nIf you have any questions about OpenSearch indices or need to search for data in your cluster, feel free to ask!",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449403940,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449403940
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "Pz-hcpwBA0d9txqBHxRA",
        "_version": 1,
        "_seq_no": 37,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449687872,
          "metadata": {
            "role": "user",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "user",
              "content": [
                {
                  "text": "What is my name?",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449687872,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449687872
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "QD-hcpwBA0d9txqBJxTM",
        "_version": 1,
        "_seq_no": 38,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449690060,
          "metadata": {
            "role": "assistant",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "assistant",
              "content": [
                {
                  "text": "Your name is Alice, as you mentioned in your introduction. How can I assist you today?",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449690060,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449690060
        ]
      }
    ]
  }
}

With tool result:

POST {{endpoint}}/_plugins/_ml/memory_containers/{{memory_container_id}}/memories/working/_search
{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 8,
      "relation": "eq"
    },
    "max_score": null,
    "hits": [
      { /* same as previous */ },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "QD-hcpwBA0d9txqBJxTM",
        "_version": 1,
        "_seq_no": 38,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771449690060,
          "metadata": {
            "role": "assistant",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "assistant",
              "content": [
                {
                  "text": "Your name is Alice, as you mentioned in your introduction. How can I assist you today?",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771449690060,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771449690060
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "Qj-pcpwBA0d9txqBeRRL",
        "_version": 1,
        "_seq_no": 41,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771450235210,
          "metadata": {
            "role": "user",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "user",
              "content": [
                {
                  "text": "list indices",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771450235210,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771450235210
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "RT-pcpwBA0d9txqBmRQw",
        "_version": 1,
        "_seq_no": 44,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771450243375,
          "metadata": {
            "role": "assistant",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "assistant",
              "toolCalls": [
                {
                  "function": {
                    "name": "ListIndexTool",
                    "arguments": "{\"indices\":[]}"
                  },
                  "id": "tooluse_zAL80DHguoqfKACcE8UPHz",
                  "type": "function"
                }
              ],
              "content": [
                {
                  "text": "I'll list all the indices in your OpenSearch cluster.",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771450243375,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 0,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771450243375
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "Rj-pcpwBA0d9txqBmRQw",
        "_version": 1,
        "_seq_no": 45,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771450243376,
          "metadata": {
            "role": "tool",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "tool",
              "toolCallId": "tooluse_zAL80DHguoqfKACcE8UPHz",
              "content": [
                {
                  "text": "row,health,status,index,uuid,pri(number of primary shards),rep(number of replica shards),docs.count(number of available documents),docs.deleted(number of deleted documents),store.size(store size of primary and replica shards),pri.store.size(store size of primary shards)\n1,green,open,.plugins-ml-model-group,NUGQ8t6WTHeLMBjaqbygQQ,1,0,2,1,19kb,19kb\n2,green,open,.plugins-ml-memory-message,xRe3nvItRd2vbrYoFCeD7w,1,0,4,1,17.1kb,17.1kb\n3,green,open,.plugins-ml-am-default-memory-sessions,i9Gna6DDQWKXOCacSm_qrQ,1,0,4,0,18.5kb,18.5kb\n4,green,open,.plugins-ml-memory-meta,6eOrDFsKQFKoNT-Ge1DClw,1,0,2,0,20.9kb,20.9kb\n5,green,open,.plugins-ml-am-memory-container,1JIHjNvHQYCJW22lOeGrVQ,1,0,4,0,24.9kb,24.9kb\n6,green,open,.plugins-ml-am-test-memrefactor-memory-sessions,uN8b_syETGeZGIj9Mj_GtA,1,0,4,0,18.5kb,18.5kb\n7,green,open,.plugins-ml-agent,MzvWpDYeQUK2bBcE_yUOYg,1,0,7,0,111.5kb,111.5kb\n8,green,open,.plugins-ml-am-test-memrefactor-memory-working,0whvmG9xTeqkWsbvKfn8wg,1,0,33,5,91.8kb,91.8kb\n9,green,open,.plugins-ml-task,n6qTpg5OT0-CTe65UTus6w,1,0,12,0,18.9kb,18.9kb\n10,green,open,.plugins-ml-connector,nFv_90_QT4KKp8JzQo_fOg,1,0,1,0,8kb,8kb\n11,green,open,.plugins-ml-config,4EWcrxWnQFu9BCn-RvcLWg,1,0,1,0,4.6kb,4.6kb\n12,green,open,.plugins-ml-am-default-memory-working,KBeqr07BS8OJa8Q0Ze9DhA,1,0,10,0,53.5kb,53.5kb\n13,green,open,.plugins-ml-model,KgBPpD3-TI2FdEuChYNVPw,1,0,7,0,366.8kb,366.8kb\n",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771450243376,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 1,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771450243376
        ]
      },
      {
        "_index": ".plugins-ml-am-test-memrefactor-memory-working",
        "_id": "Rz-pcpwBA0d9txqBmRQx",
        "_version": 1,
        "_seq_no": 46,
        "_primary_term": 1,
        "_score": null,
        "_source": {
          "payload_type": "conversational",
          "created_time": 1771450243376,
          "metadata": {
            "role": "assistant",
            "type": "structured_message"
          },
          "structured_data_blob": {
            "message": {
              "role": "assistant",
              "content": [
                {
                  "text": "Here are all the indices in your OpenSearch cluster:\n\n| # | Health | Status | Index | Documents | Store Size |\n|---|--------|--------|-------|-----------|------------|\n| 1 | green | open | .plugins-ml-model-group | 2 | 19kb |\n| 2 | green | open | .plugins-ml-memory-message | 4 | 17.1kb |\n| 3 | green | open | .plugins-ml-am-default-memory-sessions | 4 | 18.5kb |\n| 4 | green | open | .plugins-ml-memory-meta | 2 | 20.9kb |\n| 5 | green | open | .plugins-ml-am-memory-container | 4 | 24.9kb |\n| 6 | green | open | .plugins-ml-am-test-memrefactor-memory-sessions | 4 | 18.5kb |\n| 7 | green | open | .plugins-ml-agent | 7 | 111.5kb |\n| 8 | green | open | .plugins-ml-am-test-memrefactor-memory-working | 33 | 91.8kb |\n| 9 | green | open | .plugins-ml-task | 12 | 18.9kb |\n| 10 | green | open | .plugins-ml-connector | 1 | 8kb |\n| 11 | green | open | .plugins-ml-config | 1 | 4.6kb |\n| 12 | green | open | .plugins-ml-am-default-memory-working | 10 | 53.5kb |\n| 13 | green | open | .plugins-ml-model | 7 | 366.8kb |\n\nAll indices are healthy (green status) and open. These appear to be OpenSearch ML (Machine Learning) plugin-related system indices.",
                  "type": "TEXT"
                }
              ]
            }
          },
          "last_updated_time": 1771450243376,
          "infer": false,
          "namespace_size": 1,
          "namespace": {
            "session_id": "NT-ccpwBA0d9txqBwBRJ"
          },
          "message_id": 2,
          "memory_container_id": "8j-RcpwBA0d9txqBPRPX"
        },
        "sort": [
          1771450243376
        ]
      }
    ]
  }
}

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 14, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces structured message parsing across multiple LLM providers (Bedrock, OpenAI, Gemini), extends the Memory interface with structured message handling, refactors agent execution to use input messages directly instead of storing in memory, and updates memory implementations and agent runners to support unified message interfaces.

Changes

Cohort / File(s) Summary
Provider Message Parsing
common/src/main/java/org/opensearch/ml/common/model/ModelProvider.java, common/src/main/java/org/opensearch/ml/common/agent/BedrockConverseModelProvider.java, common/src/main/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProvider.java, common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java
Added abstract parseResponseMessage(String json) method to base ModelProvider and implemented in concrete providers to parse LLM responses into unified Message objects with support for text blocks, tool calls, and tool results.
Message & Content Serialization
common/src/main/java/org/opensearch/ml/common/input/execute/agent/ContentBlock.java, common/src/main/java/org/opensearch/ml/common/input/execute/agent/Message.java
Added @JsonInclude(JsonInclude.Include.NON_NULL) annotations to exclude null fields from JSON serialization.
Memory Interface Enhancement
common/src/main/java/org/opensearch/ml/common/memory/Memory.java
Added MAX_MESSAGES_TO_RETRIEVE constant and two new default methods getStructuredMessages() and saveStructuredMessages() for structured message handling.
Core Memory Implementations
ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java, ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java, ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java
Implemented structured message APIs with type signature updates; AgenticConversationMemory and ConversationIndexMemory now implement Memory<org.opensearch.ml.common.memory.Message, ...> and provide message conversion/persistence methods.
Agent Execution & Runner
ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutor.java, ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentRunner.java, ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java
Refactored agent execution to pass input messages directly instead of storing in memory; added setInputMessages() setter to runners; updated MLChatAgentRunner with unified-interface branching for structured message history and tool interaction handling.
Utility & Streaming
ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java, ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandler.java, common/src/main/java/org/opensearch/ml/common/agui/AGUIInputConverter.java
Added extractTextFromMessage() and extractMessagePairs() utilities; updated BedrockStreamingHandler to accumulate streamed content and generate final answer responses; added memory_id parameter mapping in AGUIInputConverter.
Provider Tests
common/src/test/java/org/opensearch/ml/common/agent/BedrockConverseModelProviderTest.java, common/src/test/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProviderTest.java
Added comprehensive test coverage for parseResponseMessage() across text, tool calls, mixed content, tool results, and edge cases.
Agent & Memory Tests
ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/AgentUtilsTest.java, ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunnerTest.java, ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/AgenticConversationMemoryTest.java, ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/ConversationIndexMemoryTest.java, ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemoryTest.java
Added tests for message extraction, input message handling, structured message save/retrieval, null/empty input handling, and failure scenarios.
Removed Tests
ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutorTest.java
Removed 601 lines of legacy test coverage for text extraction and memory interaction paths that are no longer relevant after refactoring.

Sequence Diagram

sequenceDiagram
    participant Agent as Agent Executor
    participant Provider as Model Provider
    participant LLM as LLM Service
    participant Memory as Memory System
    participant Runner as Agent Runner

    Agent->>Agent: Extract input messages from input
    Agent->>Runner: setInputMessages(inputMessages)
    Runner->>Memory: getStructuredMessages()
    Memory-->>Runner: historical messages + context
    
    Runner->>LLM: Send prompt with history
    LLM-->>Provider: Return JSON response
    
    Provider->>Provider: parseResponseMessage(json)
    Provider->>Provider: Extract text/tools from response
    Provider-->>Runner: Message with ContentBlocks + ToolCalls
    
    Runner->>Runner: Process tool interactions
    Runner->>Memory: saveStructuredMessages(newMessages)
    Memory-->>Runner: Acknowledgment
    
    Runner-->>Agent: Final answer
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

v3.5.0

🚥 Pre-merge checks | ✅ 1 | ❌ 3

❌ Failed checks (3 warnings)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (28 files):

⚔️ build.gradle (content)
⚔️ common/src/main/java/org/opensearch/ml/common/agent/BedrockConverseModelProvider.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProvider.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/agui/AGUIInputConverter.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/input/execute/agent/ContentBlock.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/input/execute/agent/Message.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/memory/Memory.java (content)
⚔️ common/src/main/java/org/opensearch/ml/common/model/ModelProvider.java (content)
⚔️ common/src/test/java/org/opensearch/ml/common/agent/BedrockConverseModelProviderTest.java (content)
⚔️ common/src/test/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProviderTest.java (content)
⚔️ common/src/test/java/org/opensearch/ml/common/agui/AGUIInputConverterTest.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutor.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentRunner.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandler.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java (content)
⚔️ ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/AgentUtilsTest.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutorTest.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunnerTest.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandlerTest.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/AgenticConversationMemoryTest.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/ConversationIndexMemoryTest.java (content)
⚔️ ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemoryTest.java (content)
⚔️ plugin/src/main/java/org/opensearch/ml/task/MLExecuteTaskRunner.java (content)

These conflicts must be resolved before merging into main.
Resolve conflicts locally and push changes to this branch.
Description check ⚠️ Warning The PR description is incomplete. While the author provided detailed context about the implementation goals in the initial template, the actual description sections lack concrete details about the changes made. Complete the Description section with a clear summary of what this PR achieves (extending memory interface to support structured messages). Also ensure all checklist items are properly addressed and documented.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately describes the main objectives of the changeset: adding support for structured message arrays across memory types and enabling chat history in AGUI agents.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 14, 2026 06:12 — with GitHub Actions Error
@jiapingzeng jiapingzeng temporarily deployed to ml-commons-cicd-env-require-approval February 14, 2026 06:12 — with GitHub Actions Inactive
@jiapingzeng jiapingzeng temporarily deployed to ml-commons-cicd-env-require-approval February 14, 2026 06:12 — with GitHub Actions Inactive
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 14, 2026 06:12 — with GitHub Actions Failure
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In
`@ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java`:
- Around line 1319-1417: The method extractMessagePairs currently assumes
messages is non-null and will NPE; add an early guard at the start of
extractMessagePairs to return Collections.emptyList() (or new ArrayList<>())
when messages is null or messages.isEmpty(), ensuring all callers receive an
empty list instead of throwing; update the beginning of the
extractMessagePairs(List<Message> messages, String sessionId, String appType)
method accordingly.

In
`@ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java`:
- Around line 271-303: The unified-interface branch (usesUnifiedInterface in
MLChatAgentRunner) assumes inputMessages is non-null and that
ModelProviderFactory.getProvider(mlAgent.getModel().getModelProvider()) returns
a valid provider; add quick validation before calling
memory.getStructuredMessages: check inputMessages != null and not empty, verify
mlAgent.getModel() and mlAgent.getModel().getModelProvider() are present, call
ModelProviderFactory.getProvider(...) and fail-fast with listener.onFailure(...)
if it returns null or throws; only then proceed to
memory.getStructuredMessages(...) / memory.saveStructuredMessages(...) and
runAgent(...).

In
`@ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java`:
- Around line 429-502: saveStructuredMessages issues concurrent writes which can
arrive out of order despite using messageId=i; to fix, make saves sequential or
assign a strictly monotonic sequence number instead of relying on loop index.
Modify saveStructuredMessages to either: 1) replace the parallel client.execute
loop with a recursive/iterative sequential publisher (e.g., a private helper
saveNext(int index) that builds MLAddMemoriesInput/MLAddMemoriesRequest and
calls client.execute and on success calls saveNext(index+1), propagating
failures to listener), or 2) keep parallel writes but generate and persist a
monotonic sequence number for MLAddMemoriesInput.messageId (e.g., from an
AtomicLong sequence obtained from the memory container) so ordering is
guaranteed; adjust handling around MLAddMemoriesInput, MLAddMemoriesRequest,
client.execute, remaining/hasError, and messageId accordingly.
🧹 Nitpick comments (2)
common/src/main/java/org/opensearch/ml/common/memory/Memory.java (1)

21-24: Consider making MAX_MESSAGES_TO_RETRIEVE configurable.

The hardcoded value of 10,000 messages could lead to memory pressure or performance issues when retrieving large conversation histories. Consider making this configurable via settings, or at minimum, documenting the rationale for this specific value.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java (1)

515-637: Preserve the first failure when batching structured-message saves.

If an early save fails but the last succeeds, callers only see a generic error. Capturing the first exception keeps the root cause.

♻️ Suggested adjustment
         AtomicInteger remaining = new AtomicInteger(messages.size());
         AtomicBoolean hasError = new AtomicBoolean(false);
+        java.util.concurrent.atomic.AtomicReference<Exception> firstError = new java.util.concurrent.atomic.AtomicReference<>();
@@
                 if (remaining.decrementAndGet() == 0) {
                     if (hasError.get()) {
-                        listener.onFailure(new RuntimeException("One or more structured messages failed to save"));
+                        Exception err = firstError.get();
+                        listener.onFailure(err != null ? err : new RuntimeException("One or more structured messages failed to save"));
                     } else {
                         listener.onResponse(null);
                     }
                 }
             }, e -> {
                 log.error("Failed to save structured message {} of {} to remote session {}", index + 1, messages.size(), conversationId, e);
                 hasError.set(true);
+                firstError.compareAndSet(null, e);
                 if (remaining.decrementAndGet() == 0) {
-                    listener.onFailure(e);
+                    Exception err = firstError.get();
+                    listener.onFailure(err != null ? err : e);
                 }
             }));
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f78efd5 and 4d9f496.

📒 Files selected for processing (26)
  • common/src/main/java/org/opensearch/ml/common/agent/BedrockConverseModelProvider.java
  • common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java
  • common/src/main/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProvider.java
  • common/src/main/java/org/opensearch/ml/common/agui/AGUIInputConverter.java
  • common/src/main/java/org/opensearch/ml/common/input/execute/agent/ContentBlock.java
  • common/src/main/java/org/opensearch/ml/common/input/execute/agent/Message.java
  • common/src/main/java/org/opensearch/ml/common/memory/Memory.java
  • common/src/main/java/org/opensearch/ml/common/model/ModelProvider.java
  • common/src/test/java/org/opensearch/ml/common/agent/BedrockConverseModelProviderTest.java
  • common/src/test/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProviderTest.java
  • common/src/test/java/org/opensearch/ml/common/agui/AGUIInputConverterTest.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutor.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentRunner.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandler.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/AgentUtilsTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutorTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunnerTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandlerTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/AgenticConversationMemoryTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/ConversationIndexMemoryTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemoryTest.java
💤 Files with no reviewable changes (1)
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutorTest.java
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-13T04:54:54.656Z
Learnt from: akolarkunnu
Repo: opensearch-project/ml-commons PR: 3919
File: ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java:394-409
Timestamp: 2026-01-13T04:54:54.656Z
Learning: In ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java, the test encrypt_ThrowExceptionWhenInitMLConfigIndex uses exceptionRule (not listener-based error assertions) because it mocks mlIndicesHandler.initMLConfigIndex() with doThrow() to throw synchronously before any listener callbacks occur, validating sync exception propagation rather than async listener error delivery.

Applied to files:

  • ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/ConversationIndexMemoryTest.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java
🧬 Code graph analysis (6)
ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentRunner.java (1)
search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/prompt/PromptUtil.java (1)
  • Message (540-593)
common/src/main/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProvider.java (1)
common/src/main/java/org/opensearch/ml/common/model/ModelProvider.java (1)
  • ModelProvider (22-114)
ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/AgenticConversationMemoryTest.java (2)
common/src/main/java/org/opensearch/ml/common/transport/memorycontainer/memory/MLAddMemoriesAction.java (1)
  • MLAddMemoriesAction (10-17)
common/src/main/java/org/opensearch/ml/common/transport/memorycontainer/memory/MLSearchMemoriesAction.java (1)
  • MLSearchMemoriesAction (11-18)
ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunnerTest.java (2)
common/src/main/java/org/opensearch/ml/common/input/execute/agent/AgentMLInput.java (1)
  • org (32-178)
search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/prompt/PromptUtil.java (1)
  • Message (540-593)
common/src/main/java/org/opensearch/ml/common/agent/BedrockConverseModelProvider.java (1)
common/src/main/java/org/opensearch/ml/common/input/execute/agent/AgentMLInput.java (1)
  • org (32-178)
ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java (2)
common/src/main/java/org/opensearch/ml/common/transport/memorycontainer/memory/MLSearchMemoriesAction.java (1)
  • MLSearchMemoriesAction (11-18)
common/src/main/java/org/opensearch/ml/common/transport/memorycontainer/memory/MLAddMemoriesAction.java (1)
  • MLAddMemoriesAction (10-17)
🔇 Additional comments (52)
common/src/main/java/org/opensearch/ml/common/agui/AGUIInputConverter.java (1)

112-113: LGTM!

The propagation of threadId as memory_id is appropriate for enabling memory-scoped processing in AGUI flows. The inline comment clearly documents the purpose.

common/src/main/java/org/opensearch/ml/common/input/execute/agent/ContentBlock.java (1)

8-9: LGTM!

Adding @JsonInclude(JsonInclude.Include.NON_NULL) is appropriate for producing cleaner JSON output by excluding null fields. This aligns with the same annotation applied to Message.java.

Also applies to: 18-18

common/src/main/java/org/opensearch/ml/common/input/execute/agent/Message.java (1)

10-11: LGTM!

The Jackson annotation for excluding null fields is consistent with ContentBlock.java, ensuring uniform serialization behavior across the message-related DTOs.

Also applies to: 20-20

common/src/test/java/org/opensearch/ml/common/agui/AGUIInputConverterTest.java (2)

687-698: LGTM!

The test properly validates the new memory_id parameter propagation from threadId. It correctly asserts both the memory_id and AGUI_PARAM_THREAD_ID values are present and equal.


700-717: LGTM!

The helper method buildMinimalAGUIInput is well-structured, creating a valid AGUI input with all required fields. It can be reused for additional test cases if needed.

common/src/main/java/org/opensearch/ml/common/model/ModelProvider.java (1)

77-83: LGTM!

The new abstract method parseResponseMessage provides a clean extension to the ModelProvider contract, enabling unified response parsing across different LLM providers. The Javadoc clearly documents the expected behavior.

common/src/main/java/org/opensearch/ml/common/memory/Memory.java (1)

46-66: LGTM!

The default implementations using UnsupportedOperationException provide clean backward compatibility, allowing existing memory implementations to remain unchanged while new implementations can override these methods. The use of fully qualified org.opensearch.ml.common.input.execute.agent.Message avoids potential naming conflicts with the generic type parameter T extends Message.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutor.java (3)

514-519: LGTM!

The extraction of inputMessages from AgentMLInput when the input type is MESSAGES is correctly implemented. The conditional check ensures that inputMessages is only populated when appropriate, and the @SuppressWarnings("unchecked") annotation is justified for the type cast.


434-436: LGTM!

Passing null for inputMessages in the existing memory path is correct since these code paths handle scenarios where messages are retrieved from memory rather than passed directly.

Also applies to: 458-460


732-732: No action required—null handling is already implemented.

The inputMessages parameter can be null (when input type is not MESSAGES), and the code safely handles this case. The interface documentation explicitly states the parameter "may be null." All downstream consumers, including ConversationIndexMemory.saveStructuredMessages(), RemoteAgenticConversationMemory.saveStructuredMessages(), and AgenticConversationMemory.saveStructuredMessages(), have explicit null checks that return successfully when messages are null or empty. This behavior is also covered by existing unit tests.

common/src/main/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProvider.java (2)

8-8: LGTM!

The new imports are correctly added to support the parseResponseMessage implementation.

Also applies to: 25-25, 32-32


356-409: Implementation looks correct with a minor null-handling note.

The parseResponseMessage method correctly handles OpenAI's response format including text content, tool calls, and tool result messages. The default role of "assistant" is appropriate for response parsing.

One minor consideration: on line 381, String.valueOf(tc.get("id")) will return the literal string "null" if id is null, rather than an empty string. While the check tc.get("id") != null is present, consider using the same pattern as line 386 with getOrDefault for consistency:

💡 Optional consistency improvement
-                String id = tc.get("id") != null ? String.valueOf(tc.get("id")) : "";
+                String id = tc.get("id") != null ? String.valueOf(tc.get("id")) : "";

The current implementation is already correct - the ternary handles the null case. This is just a note for awareness.

common/src/test/java/org/opensearch/ml/common/agent/OpenaiV1ChatCompletionsModelProviderTest.java (1)

1104-1184: Good coverage for parseResponseMessage edge cases.

These tests exercise text-only, tool calls, tool results, null/empty content, and default role paths, which should catch most regressions in the parser.

ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/AgenticConversationMemoryTest.java (1)

777-1041: Solid structured-message coverage in memory tests.

The new cases cover null/empty input handling, success/failure paths, and container-ID validation, which should harden the new APIs.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentRunner.java (2)

8-13: Import-only change.


40-44: Default setter keeps backward compatibility.

A no-op default preserves existing implementations while allowing runners to opt into structured inputs.

common/src/main/java/org/opensearch/ml/common/agent/BedrockConverseModelProvider.java (2)

8-35: Import-only change.


412-485: Bedrock response parsing looks solid.

The method cleanly handles text blocks, tool calls, tool results, and role defaults with appropriate null checks.

ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/AgentUtilsTest.java (1)

2061-2257: Great coverage for structured message extraction utilities.

These cases cover null/empty content, mixed block types, whitespace handling, and message pairing behavior.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java (1)

1287-1307: Text-block extraction looks solid.

Concise helper and trimming behavior keep downstream pairing predictable.

ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunnerTest.java (1)

1596-1651: Good coverage for setInputMessages behavior.

Covers stored, null, and default no-op paths without over-mocking.

ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/ConversationIndexMemoryTest.java (1)

162-304: Structured-message tests hit the important cases.

Null/empty inputs, partial failures, and conversion paths are exercised.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandler.java (2)

130-325: AG‑UI streaming aggregation fits the final-answer flow.

Accumulating content and emitting run-finished plus final answer keeps the AG‑UI stream consistent.


436-455: Final-answer wrapper matches Bedrock response shape.

Using stopReason = end_turn and a content array aligns with downstream parsing expectations.

ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemoryTest.java (1)

334-395: Structured-message edge cases are well covered.

Missing container-id, null, and empty inputs are handled.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java (1)

74-78: Deterministic ordering key is a good addition.

Sorting with message_id alongside created_time should stabilize retrieval.

common/src/test/java/org/opensearch/ml/common/agent/BedrockConverseModelProviderTest.java (1)

1006-1088: Solid coverage for parseResponseMessage scenarios.
These cases should keep text/tool parsing behavior stable across edge inputs.

ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/remote/streaming/BedrockStreamingHandlerTest.java (2)

9-18: Import additions align with new test usage.


242-290: Nice coverage for final-answer response creation.
Covers non-null, null, and empty text paths.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java (11)

73-90: Import additions are consistent with the structured-message flow.


167-207: Input message injection looks straightforward.


230-232: Early exit path is unchanged in behavior.


412-425: AG-UI tool-result routing looks consistent with the unified-tools flow.


429-443: Unified-interface context wiring in runReAct looks consistent.


519-523: Call-site propagation of unified-interface context looks correct.

Also applies to: 572-575, 717-720


955-1027: Final-answer persistence now cleanly branches for unified vs legacy.


1034-1084: Structured assistant message save helper is clear and self-contained.


1232-1236: Max-iteration paths now consistently pass unified-interface context.

Also applies to: 1293-1297


1482-1491: Unified-tools wiring across backend/frontend is consistent.

Also applies to: 1540-1541


1594-1598: AG-UI tool-result reinjection reuses the unified-tools path correctly.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java (5)

11-26: Import additions are aligned with structured-message support.


36-36: Memory type parameterization update looks consistent.


80-104: save(...) signature updates are consistent with the new memory Message type.


121-164: Structured message reconstruction from interactions looks solid.


166-233: Sequential save path with aggregated error handling is robust.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java (7)

8-55: Import updates align with new structured-message functionality.


65-73: Class signature and metadata field additions look consistent.


95-173: save(...) updates are consistent with memory-container storage format.


227-227: Updated timestamp merge is correct.


267-375: Interaction parsing updates look consistent with the new schema.


382-426: Structured-message retrieval path looks clean.


590-592: Trace timestamp handling looks consistent.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +300 to +303
@Override
public Message parseResponseMessage(String json) {
throw new UnsupportedOperationException("parseResponseMessage is not yet supported for Gemini model provider");
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Locate all parseResponseMessage call sites and Gemini provider usage
rg -n "parseResponseMessage\(" --glob "*.java" -A 2 -B 2

Repository: opensearch-project/ml-commons

Length of output: 13816


🏁 Script executed:

#!/bin/bash
# Find all usages of GeminiV1BetaGenerateContentModelProvider
rg -n "GeminiV1BetaGenerateContentModelProvider" --glob "*.java"

Repository: opensearch-project/ml-commons

Length of output: 1096


🏁 Script executed:

#!/bin/bash
# Check the parent class/interface to understand the contract
rg -n "class GeminiV1BetaGenerateContentModelProvider" --glob "*.java" -A 5

Repository: opensearch-project/ml-commons

Length of output: 1928


🏁 Script executed:

#!/bin/bash
# Find other model providers to see how they implement parseResponseMessage
rg -n "public Message parseResponseMessage" --glob "*.java" -B 5 -A 3

Repository: opensearch-project/ml-commons

Length of output: 3424


🏁 Script executed:

#!/bin/bash
# Examine the context around parseResponseMessage call in MLChatAgentRunner
cat -n ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java | sed -n '1050,1075p'

Repository: opensearch-project/ml-commons

Length of output: 1395


🏁 Script executed:

#!/bin/bash
# Look for toolInteractions generation and any Gemini guards
rg -n "toolInteractions" --glob "*.java" -B 5 -A 5

Repository: opensearch-project/ml-commons

Length of output: 27193


🏁 Script executed:

#!/bin/bash
# Search for any Gemini-specific guards in MLChatAgentRunner
rg -n "GEMINI|Gemini" ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java -B 3 -A 3

Repository: opensearch-project/ml-commons

Length of output: 55


🏁 Script executed:

#!/bin/bash
# Check Gemini implementation for tool support and unified interface support
cat -n common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java | head -100

Repository: opensearch-project/ml-commons

Length of output: 5598


🏁 Script executed:

#!/bin/bash
# Look for usesUnifiedInterface configuration and Gemini usage
rg -n "usesUnifiedInterface" --glob "*.java" -B 2 -A 2

Repository: opensearch-project/ml-commons

Length of output: 8323


🏁 Script executed:

#!/bin/bash
# Check if there are any tests that use Gemini with tool interactions
rg -n "GeminiV1BetaGenerateContentModelProviderTest" --glob "*.java" -A 50 | head -100

Repository: opensearch-project/ml-commons

Length of output: 7254


🏁 Script executed:

#!/bin/bash
# Check if Gemini has tool/function calling support
rg -n "tool" common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java -i

Repository: opensearch-project/ml-commons

Length of output: 286


🏁 Script executed:

#!/bin/bash
# Check the full Gemini implementation for unified interface support
wc -l common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java

Repository: opensearch-project/ml-commons

Length of output: 174


🏁 Script executed:

#!/bin/bash
# Get full Gemini implementation to understand tool support
sed -n '200,350p' common/src/main/java/org/opensearch/ml/common/agent/GeminiV1BetaGenerateContentModelProvider.java

Repository: opensearch-project/ml-commons

Length of output: 5495


Implement parseResponseMessage for Gemini to avoid losing tool interaction data.

The method throws UnsupportedOperationException, but since the call is wrapped in a try-catch at MLChatAgentRunner:1061, it will not crash the agent. However, when tool interactions exist for Gemini, they will be silently skipped and not stored as structured messages in the unified interface flow. Gemini supports tools (tool configurations are included in its request template), so the parser is needed to properly capture tool call and result messages in conversation history.

Comment on lines +1319 to +1417
public static List<ConversationIndexMessage> extractMessagePairs(List<Message> messages, String sessionId, String appType) {
List<ConversationIndexMessage> messagePairs = new ArrayList<>();

StringBuilder userTextBuilder = new StringBuilder();
StringBuilder assistantTextBuilder = new StringBuilder();
boolean skippingTrailingUsers = true;
String currentRole = null;

for (int i = messages.size() - 1; i >= 0; i--) {
Message message = messages.get(i);

if (message == null || message.getRole() == null) {
continue;
}

String role = message.getRole().toLowerCase();

// Skip non-user/assistant roles
if (!role.equals("user") && !role.equals("assistant")) {
continue;
}

// Skip trailing user messages
if (skippingTrailingUsers && role.equals("user")) {
continue;
}

if (skippingTrailingUsers && role.equals("assistant")) {
skippingTrailingUsers = false;
}

// Detect role change from user to assistant (going backwards)
if (currentRole != null && currentRole.equals("user") && role.equals("assistant")) {
// Save the accumulated pair
String userText = userTextBuilder.toString().trim();
String assistantText = assistantTextBuilder.toString().trim();

if (!userText.isEmpty() && !assistantText.isEmpty()) {
ConversationIndexMessage msg = ConversationIndexMessage
.conversationIndexMessageBuilder()
.type(appType)
.question(userText)
.response(assistantText)
.finalAnswer(true)
.sessionId(sessionId)
.build();

messagePairs.add(msg);
}

// Clear buffers for next pair
userTextBuilder.setLength(0);
assistantTextBuilder.setLength(0);
}

// Extract text
String text = extractTextFromMessage(message);

// Accumulate text based on role (prepending since we're going backwards)
if (role.equals("user")) {
if (!text.isEmpty()) {
if (userTextBuilder.length() > 0) {
userTextBuilder.insert(0, "\n");
}
userTextBuilder.insert(0, text);
}
} else if (role.equals("assistant")) {
if (!text.isEmpty()) {
if (assistantTextBuilder.length() > 0) {
assistantTextBuilder.insert(0, "\n");
}
assistantTextBuilder.insert(0, text);
}
}

currentRole = role;
}

// Save any remaining pair
String userText = userTextBuilder.toString().trim();
String assistantText = assistantTextBuilder.toString().trim();

if (!userText.isEmpty() && !assistantText.isEmpty()) {
ConversationIndexMessage msg = ConversationIndexMessage
.conversationIndexMessageBuilder()
.type(appType)
.question(userText)
.response(assistantText)
.finalAnswer(true)
.sessionId(sessionId)
.build();

messagePairs.add(msg);
}

// Reverse to maintain chronological order
Collections.reverse(messagePairs);

return messagePairs;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard against null/empty message lists.

extractMessagePairs will throw if messages is null; returning an empty list keeps it safe for callers.

🛡️ Suggested guard
 public static List<ConversationIndexMessage> extractMessagePairs(List<Message> messages, String sessionId, String appType) {
+    if (messages == null || messages.isEmpty()) {
+        return Collections.emptyList();
+    }
     List<ConversationIndexMessage> messagePairs = new ArrayList<>();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public static List<ConversationIndexMessage> extractMessagePairs(List<Message> messages, String sessionId, String appType) {
List<ConversationIndexMessage> messagePairs = new ArrayList<>();
StringBuilder userTextBuilder = new StringBuilder();
StringBuilder assistantTextBuilder = new StringBuilder();
boolean skippingTrailingUsers = true;
String currentRole = null;
for (int i = messages.size() - 1; i >= 0; i--) {
Message message = messages.get(i);
if (message == null || message.getRole() == null) {
continue;
}
String role = message.getRole().toLowerCase();
// Skip non-user/assistant roles
if (!role.equals("user") && !role.equals("assistant")) {
continue;
}
// Skip trailing user messages
if (skippingTrailingUsers && role.equals("user")) {
continue;
}
if (skippingTrailingUsers && role.equals("assistant")) {
skippingTrailingUsers = false;
}
// Detect role change from user to assistant (going backwards)
if (currentRole != null && currentRole.equals("user") && role.equals("assistant")) {
// Save the accumulated pair
String userText = userTextBuilder.toString().trim();
String assistantText = assistantTextBuilder.toString().trim();
if (!userText.isEmpty() && !assistantText.isEmpty()) {
ConversationIndexMessage msg = ConversationIndexMessage
.conversationIndexMessageBuilder()
.type(appType)
.question(userText)
.response(assistantText)
.finalAnswer(true)
.sessionId(sessionId)
.build();
messagePairs.add(msg);
}
// Clear buffers for next pair
userTextBuilder.setLength(0);
assistantTextBuilder.setLength(0);
}
// Extract text
String text = extractTextFromMessage(message);
// Accumulate text based on role (prepending since we're going backwards)
if (role.equals("user")) {
if (!text.isEmpty()) {
if (userTextBuilder.length() > 0) {
userTextBuilder.insert(0, "\n");
}
userTextBuilder.insert(0, text);
}
} else if (role.equals("assistant")) {
if (!text.isEmpty()) {
if (assistantTextBuilder.length() > 0) {
assistantTextBuilder.insert(0, "\n");
}
assistantTextBuilder.insert(0, text);
}
}
currentRole = role;
}
// Save any remaining pair
String userText = userTextBuilder.toString().trim();
String assistantText = assistantTextBuilder.toString().trim();
if (!userText.isEmpty() && !assistantText.isEmpty()) {
ConversationIndexMessage msg = ConversationIndexMessage
.conversationIndexMessageBuilder()
.type(appType)
.question(userText)
.response(assistantText)
.finalAnswer(true)
.sessionId(sessionId)
.build();
messagePairs.add(msg);
}
// Reverse to maintain chronological order
Collections.reverse(messagePairs);
return messagePairs;
public static List<ConversationIndexMessage> extractMessagePairs(List<Message> messages, String sessionId, String appType) {
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
List<ConversationIndexMessage> messagePairs = new ArrayList<>();
StringBuilder userTextBuilder = new StringBuilder();
StringBuilder assistantTextBuilder = new StringBuilder();
boolean skippingTrailingUsers = true;
String currentRole = null;
for (int i = messages.size() - 1; i >= 0; i--) {
Message message = messages.get(i);
if (message == null || message.getRole() == null) {
continue;
}
String role = message.getRole().toLowerCase();
// Skip non-user/assistant roles
if (!role.equals("user") && !role.equals("assistant")) {
continue;
}
// Skip trailing user messages
if (skippingTrailingUsers && role.equals("user")) {
continue;
}
if (skippingTrailingUsers && role.equals("assistant")) {
skippingTrailingUsers = false;
}
// Detect role change from user to assistant (going backwards)
if (currentRole != null && currentRole.equals("user") && role.equals("assistant")) {
// Save the accumulated pair
String userText = userTextBuilder.toString().trim();
String assistantText = assistantTextBuilder.toString().trim();
if (!userText.isEmpty() && !assistantText.isEmpty()) {
ConversationIndexMessage msg = ConversationIndexMessage
.conversationIndexMessageBuilder()
.type(appType)
.question(userText)
.response(assistantText)
.finalAnswer(true)
.sessionId(sessionId)
.build();
messagePairs.add(msg);
}
// Clear buffers for next pair
userTextBuilder.setLength(0);
assistantTextBuilder.setLength(0);
}
// Extract text
String text = extractTextFromMessage(message);
// Accumulate text based on role (prepending since we're going backwards)
if (role.equals("user")) {
if (!text.isEmpty()) {
if (userTextBuilder.length() > 0) {
userTextBuilder.insert(0, "\n");
}
userTextBuilder.insert(0, text);
}
} else if (role.equals("assistant")) {
if (!text.isEmpty()) {
if (assistantTextBuilder.length() > 0) {
assistantTextBuilder.insert(0, "\n");
}
assistantTextBuilder.insert(0, text);
}
}
currentRole = role;
}
// Save any remaining pair
String userText = userTextBuilder.toString().trim();
String assistantText = assistantTextBuilder.toString().trim();
if (!userText.isEmpty() && !assistantText.isEmpty()) {
ConversationIndexMessage msg = ConversationIndexMessage
.conversationIndexMessageBuilder()
.type(appType)
.question(userText)
.response(assistantText)
.finalAnswer(true)
.sessionId(sessionId)
.build();
messagePairs.add(msg);
}
// Reverse to maintain chronological order
Collections.reverse(messagePairs);
return messagePairs;
}
🤖 Prompt for AI Agents
In
`@ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/AgentUtils.java`
around lines 1319 - 1417, The method extractMessagePairs currently assumes
messages is non-null and will NPE; add an early guard at the start of
extractMessagePairs to return Collections.emptyList() (or new ArrayList<>())
when messages is null or messages.isEmpty(), ensuring all callers receive an
empty list instead of throwing; update the beginning of the
extractMessagePairs(List<Message> messages, String sessionId, String appType)
method accordingly.

Comment on lines +271 to +303
// Check if agent uses unified interface (has model field)
// Unified interface agents store and retrieve structured messages with function calling
boolean usesUnifiedInterface = mlAgent.getModel() != null;

if (usesUnifiedInterface) {
// Get history first, then save new input messages
memory.getStructuredMessages(ActionListener.wrap(allMessages -> {
// Apply history limit
List<Message> history = messageHistoryLimit > 0 && allMessages.size() > messageHistoryLimit
? allMessages.subList(allMessages.size() - messageHistoryLimit, allMessages.size())
: allMessages;

// Save input messages
memory.saveStructuredMessages(inputMessages, ActionListener.wrap(v -> {
if (!history.isEmpty()) {
// Format history messages using the model provider for API-compatible output
ModelProvider modelProvider = ModelProviderFactory.getProvider(mlAgent.getModel().getModelProvider());
MLAgentType agentType = MLAgentType.from(mlAgent.getType());
Map<String, String> historyParams = modelProvider.mapMessages(history, agentType);
String formattedHistory = historyParams.get("body");
if (formattedHistory != null && !formattedHistory.isEmpty()) {
params.put(NEW_CHAT_HISTORY, formattedHistory + ", ");
}
}
runAgent(mlAgent, params, listener, memory, functionCalling);
}, e -> {
log.error("Failed to save input messages", e);
listener.onFailure(e);
}));
}, e -> {
log.error("Failed to get history", e);
listener.onFailure(e);
}));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate unified-interface inputs before memory operations.
This path assumes inputMessages is non-null and that a valid ModelProvider exists; otherwise it can throw or fail with unclear errors. A fast failure makes misconfiguration explicit.

✅ Suggested guard for unified-interface inputs
-            if (usesUnifiedInterface) {
+            if (usesUnifiedInterface) {
+                if (inputMessages == null || inputMessages.isEmpty()) {
+                    listener.onFailure(new IllegalArgumentException("inputMessages is required for unified-interface agents"));
+                    return;
+                }
+                ModelProvider modelProvider = ModelProviderFactory.getProvider(mlAgent.getModel().getModelProvider());
+                if (modelProvider == null) {
+                    listener.onFailure(
+                        new IllegalArgumentException("Unknown model provider: " + mlAgent.getModel().getModelProvider())
+                    );
+                    return;
+                }
                 // Get history first, then save new input messages
                 memory.getStructuredMessages(ActionListener.wrap(allMessages -> {
                     // Apply history limit
                     List<Message> history = messageHistoryLimit > 0 && allMessages.size() > messageHistoryLimit
                         ? allMessages.subList(allMessages.size() - messageHistoryLimit, allMessages.size())
                         : allMessages;
 
                     // Save input messages
                     memory.saveStructuredMessages(inputMessages, ActionListener.wrap(v -> {
                         if (!history.isEmpty()) {
                             // Format history messages using the model provider for API-compatible output
-                            ModelProvider modelProvider = ModelProviderFactory.getProvider(mlAgent.getModel().getModelProvider());
                             MLAgentType agentType = MLAgentType.from(mlAgent.getType());
                             Map<String, String> historyParams = modelProvider.mapMessages(history, agentType);
🤖 Prompt for AI Agents
In
`@ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java`
around lines 271 - 303, The unified-interface branch (usesUnifiedInterface in
MLChatAgentRunner) assumes inputMessages is non-null and that
ModelProviderFactory.getProvider(mlAgent.getModel().getModelProvider()) returns
a valid provider; add quick validation before calling
memory.getStructuredMessages: check inputMessages != null and not empty, verify
mlAgent.getModel() and mlAgent.getModel().getModelProvider() are present, call
ModelProviderFactory.getProvider(...) and fail-fast with listener.onFailure(...)
if it returns null or throws; only then proceed to
memory.getStructuredMessages(...) / memory.saveStructuredMessages(...) and
runAgent(...).

Comment on lines +429 to +502
public void saveStructuredMessages(List<Message> messages, ActionListener<Void> listener) {
log
.debug(
"saveStructuredMessages: Entry - memoryContainerId={}, conversationId={}, messages count={}",
memoryContainerId,
conversationId,
messages != null ? messages.size() : "null"
);
if (Strings.isNullOrEmpty(memoryContainerId)) {
listener.onFailure(new IllegalStateException("Memory container ID is not configured for this AgenticConversationMemory"));
return;
}

if (messages == null || messages.isEmpty()) {
listener.onResponse(null);
return;
}

AtomicInteger remaining = new AtomicInteger(messages.size());
AtomicBoolean hasError = new AtomicBoolean(false);

for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);

// Build namespace
Map<String, String> namespace = new HashMap<>();
namespace.put(SESSION_ID_FIELD, conversationId);

// Build structured_data_blob
Map<String, Object> structuredData = new HashMap<>();
Map<String, Object> serializableMessage = gson.fromJson(StringUtils.toJson(message), new TypeToken<Map<String, Object>>() {
}.getType());
structuredData.put("message", serializableMessage);

// Build metadata
Map<String, String> metadata = new HashMap<>();
metadata.put("type", "structured_message");
if (message.getRole() != null) {
metadata.put("role", message.getRole());
}

// Use messageId as sequence number so retrieval can sort by
// (created_time ASC, message_id ASC) to preserve ordering.
MLAddMemoriesInput input = MLAddMemoriesInput
.builder()
.memoryContainerId(memoryContainerId)
.structuredDataBlob(structuredData)
.messageId(i)
.namespace(namespace)
.metadata(metadata)
.infer(false)
.build();

MLAddMemoriesRequest request = MLAddMemoriesRequest.builder().mlAddMemoryInput(input).build();

int index = i;
client.execute(MLAddMemoriesAction.INSTANCE, request, ActionListener.wrap(response -> {
log.debug("Saved structured message {} of {} to session {}", index + 1, messages.size(), conversationId);
if (remaining.decrementAndGet() == 0) {
if (hasError.get()) {
listener.onFailure(new RuntimeException("One or more structured messages failed to save"));
} else {
listener.onResponse(null);
}
}
}, e -> {
log.error("Failed to save structured message {} of {} to session {}", index + 1, messages.size(), conversationId, e);
hasError.set(true);
if (remaining.decrementAndGet() == 0) {
listener.onFailure(e);
}
}));
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve message ordering when saving structured messages.
Writes are issued concurrently, but retrieval sorts by created_time then message_id. If requests arrive out of order (or different batches share the same timestamp), chat history can be reordered. Consider serializing saves (or persisting a monotonic sequence) to keep ordering stable.

✅ Suggested sequential save to preserve ordering
-        AtomicInteger remaining = new AtomicInteger(messages.size());
-        AtomicBoolean hasError = new AtomicBoolean(false);
-
-        for (int i = 0; i < messages.size(); i++) {
-            Message message = messages.get(i);
-            ...
-            client.execute(MLAddMemoriesAction.INSTANCE, request, ActionListener.wrap(response -> {
-                log.debug("Saved structured message {} of {} to session {}", index + 1, messages.size(), conversationId);
-                if (remaining.decrementAndGet() == 0) {
-                    if (hasError.get()) {
-                        listener.onFailure(new RuntimeException("One or more structured messages failed to save"));
-                    } else {
-                        listener.onResponse(null);
-                    }
-                }
-            }, e -> {
-                log.error("Failed to save structured message {} of {} to session {}", index + 1, messages.size(), conversationId, e);
-                hasError.set(true);
-                if (remaining.decrementAndGet() == 0) {
-                    listener.onFailure(e);
-                }
-            }));
-        }
+        saveStructuredMessagesSequentially(messages, 0, new AtomicBoolean(false), listener);
+    private void saveStructuredMessagesSequentially(
+        List<Message> messages,
+        int index,
+        AtomicBoolean hasError,
+        ActionListener<Void> listener
+    ) {
+        if (index >= messages.size()) {
+            if (hasError.get()) {
+                listener.onFailure(new RuntimeException("One or more structured messages failed to save"));
+            } else {
+                listener.onResponse(null);
+            }
+            return;
+        }
+
+        Message message = messages.get(index);
+        // build namespace/structuredData/metadata/request as before...
+        client.execute(MLAddMemoriesAction.INSTANCE, request, ActionListener.wrap(response -> {
+            log.debug("Saved structured message {} of {} to session {}", index + 1, messages.size(), conversationId);
+            saveStructuredMessagesSequentially(messages, index + 1, hasError, listener);
+        }, e -> {
+            log.error("Failed to save structured message {} of {} to session {}", index + 1, messages.size(), conversationId, e);
+            hasError.set(true);
+            saveStructuredMessagesSequentially(messages, index + 1, hasError, listener);
+        }));
+    }
🤖 Prompt for AI Agents
In
`@ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java`
around lines 429 - 502, saveStructuredMessages issues concurrent writes which
can arrive out of order despite using messageId=i; to fix, make saves sequential
or assign a strictly monotonic sequence number instead of relying on loop index.
Modify saveStructuredMessages to either: 1) replace the parallel client.execute
loop with a recursive/iterative sequential publisher (e.g., a private helper
saveNext(int index) that builds MLAddMemoriesInput/MLAddMemoriesRequest and
calls client.execute and on success calls saveNext(index+1), propagating
failures to listener), or 2) keep parallel writes but generate and persist a
monotonic sequence number for MLAddMemoriesInput.messageId (e.g., from an
AtomicLong sequence obtained from the memory container) so ordering is
guaranteed; adjust handling around MLAddMemoriesInput, MLAddMemoriesRequest,
client.execute, remaining/hasError, and messageId accordingly.

@ylwu-amzn
Copy link
Copy Markdown
Collaborator

Can you add sample request/response

@dhrubo-os
Copy link
Copy Markdown
Collaborator

Can you please update the PR description with more details?

@jiapingzeng
Copy link
Copy Markdown
Contributor Author

@ylwu-amzn @dhrubo-os added context and sample request/response in description, could you please help take a look again?

This PR also addresses bug #4595

@ylwu-amzn
Copy link
Copy Markdown
Collaborator

From the test example, seem message_id hard coded as 0?

@jiapingzeng
Copy link
Copy Markdown
Contributor Author

jiapingzeng commented Feb 18, 2026

From the test example, seem message_id hard coded as 0?

Using a combination of created time and message id to sort the messages. Reason is that when using just message_id, when storing a new message, we need to first retrieve the highest message_id then add to it.

// AgenticConversationMemory.java

    public void getStructuredMessages(ActionListener<List<Message>> listener) {
        ...
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQuery);
        searchSourceBuilder.size(Memory.MAX_MESSAGES_TO_RETRIEVE);
        searchSourceBuilder.sort(CREATED_TIME_FIELD, SortOrder.ASC);
        searchSourceBuilder.sort(MESSAGE_ID_FIELD, SortOrder.ASC);
        ...
    }

But with this combination, we do not need a get request to store new messages as newer messages have higher timestamps. Messages stored at the same time will have different message IDs. You can see different IDs in the tool result sample response.

@jiapingzeng jiapingzeng force-pushed the extend-memory-interface branch from 4d9f496 to 7c1e93d Compare February 18, 2026 22:38
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 18, 2026 22:40 — with GitHub Actions Failure
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 18, 2026 22:40 — with GitHub Actions Error
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 18, 2026 22:40 — with GitHub Actions Error
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
… not exist

Signed-off-by: Jiaping Zeng <jpz@amazon.com>
…cutor

Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
…AgentExecutor"

This reverts commit a364488.

Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
…nput"

This reverts commit 1e667ee.

Signed-off-by: Jiaping Zeng <jpz@amazon.com>
@jiapingzeng jiapingzeng force-pushed the extend-memory-interface branch from 004eadc to 12c4fc6 Compare February 26, 2026 06:30
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 26, 2026 06:31 — with GitHub Actions Failure
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 26, 2026 06:31 — with GitHub Actions Failure
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 26, 2026 06:31 — with GitHub Actions Failure
@jiapingzeng jiapingzeng had a problem deploying to ml-commons-cicd-env-require-approval February 26, 2026 06:31 — with GitHub Actions Failure
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
Signed-off-by: Jiaping Zeng <jpz@amazon.com>
@codecov
Copy link
Copy Markdown

codecov bot commented Feb 26, 2026

Codecov Report

❌ Patch coverage is 60.77982% with 342 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.06%. Comparing base (8ff7ef9) to head (826eeba).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
...engine/memory/RemoteAgenticConversationMemory.java 6.25% 118 Missing and 2 partials ⚠️
...ch/ml/engine/algorithms/agent/MLAgentExecutor.java 36.84% 43 Missing and 5 partials ⚠️
...orithms/remote/streaming/HttpStreamingHandler.java 3.22% 30 Missing ⚠️
...ch/ml/engine/memory/AgenticConversationMemory.java 80.55% 18 Missing and 10 partials ⚠️
...arch/ml/engine/memory/ConversationIndexMemory.java 81.39% 12 Missing and 12 partials ⚠️
.../ml/engine/algorithms/agent/MLChatAgentRunner.java 74.60% 14 Missing and 2 partials ⚠️
...thms/remote/streaming/BedrockStreamingHandler.java 33.33% 16 Missing ⚠️
...nsearch/ml/engine/algorithms/agent/AgentUtils.java 78.78% 3 Missing and 11 partials ⚠️
.../ml/common/agent/BedrockConverseModelProvider.java 80.76% 1 Missing and 9 partials ⚠️
...on/agent/OpenaiV1ChatCompletionsModelProvider.java 79.41% 1 Missing and 6 partials ⚠️
... and 7 more
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4645      +/-   ##
============================================
- Coverage     77.23%   77.06%   -0.18%     
- Complexity    11241    11368     +127     
============================================
  Files           944      944              
  Lines         50459    51144     +685     
  Branches       6073     6197     +124     
============================================
+ Hits          38973    39415     +442     
- Misses         8930     9118     +188     
- Partials       2556     2611      +55     
Flag Coverage Δ
ml-commons 77.06% <60.77%> (-0.18%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Jiaping Zeng <jpz@amazon.com>
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add structured messages support to memory implementations

Relevant files:

  • ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/ConversationIndexMemoryTest.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/memory/AgenticConversationMemoryTest.java

Sub-PR theme: Refactor agent executor to use unified memory interface

Relevant files:

  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutor.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutorTest.java

Sub-PR theme: Update agent runners for structured message handling

Relevant files:

  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunnerTest.java
  • ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAGUIAgentRunner.java
  • ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/agent/MLAGUIAgentRunnerTest.java

⚡ Recommended focus areas for review

Possible Issue

String comparison using '!=' instead of '.equals()'. This will compare object references, not string content, potentially causing incorrect behavior when checking if scratchpad notes are empty.

if (llmToolTmpParameters.containsKey(SCRATCHPAD_NOTES_KEY) && !"[]".equals(llmToolTmpParameters.get(SCRATCHPAD_NOTES_KEY))) {
Logic Issue

The traceDisabled flag logic changed to include usesUnifiedInterface with OR operator. This means traces will be disabled for all unified interface requests. Verify this is the intended behavior, as it may prevent debugging for new interface types.

boolean traceDisabled = usesUnifiedInterface
    || (tmpParameters.containsKey(DISABLE_TRACE) && Boolean.parseBoolean(tmpParameters.get(DISABLE_TRACE)));
Race Condition

The lastIncompleteInteractionId AtomicReference is used to track pending interactions across async operations. However, concurrent calls to saveStructuredMessages could lead to race conditions where the wrong interaction ID is retrieved or cleared, potentially causing data corruption or lost updates.

String pendingInteractionId = this.lastIncompleteInteractionId.getAndSet(null);
if (pendingInteractionId != null) {
    String assistantText = extractAssistantText(filteredMessages);
    if (assistantText != null && !assistantText.isEmpty()) {
        String interactionId = pendingInteractionId;
        update(interactionId, Map.of(AI_RESPONSE_FIELD, assistantText), ActionListener.wrap(updateResponse -> {
            log.info("Updated incomplete interaction {} with assistant response", interactionId);
            listener.onResponse(null);
        }, e -> {
            log.error("Failed to update incomplete interaction {} with assistant response", interactionId, e);
            listener.onFailure(e);
        }));
        return;
    }
}
Incomplete Logic

The condition checks if agentMLInput.getAgentInput() == null before processing legacy question input, but the subsequent code doesn't handle the case where both agentInput and question parameter are missing. This could lead to null pointer exceptions or unexpected behavior.

if (agentMLInput.getAgentInput() == null && agentMLInput.getInputDataset() != null) {
    RemoteInferenceInputDataSet remoteInferenceInputDataSet = (RemoteInferenceInputDataSet) agentMLInput.getInputDataset();

@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix string comparison operator

Use .equals() instead of != for string comparison. The current code uses reference
equality (!=) which may not work as intended for string content comparison. This
could cause the scratchpad notes to be incorrectly updated or skipped.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java [888-890]

-if (llmToolTmpParameters.containsKey(SCRATCHPAD_NOTES_KEY) && llmToolTmpParameters.get(SCRATCHPAD_NOTES_KEY) != "[]") {
+if (llmToolTmpParameters.containsKey(SCRATCHPAD_NOTES_KEY) && !"[]".equals(llmToolTmpParameters.get(SCRATCHPAD_NOTES_KEY))) {
     tmpParameters.put(SCRATCHPAD_NOTES_KEY, llmToolTmpParameters.getOrDefault(SCRATCHPAD_NOTES_KEY, "[]"));
 }
Suggestion importance[1-10]: 10

__

Why: Critical bug fix. Using != for string comparison in Java compares references, not content. This will almost always evaluate to true even when the strings are equal, causing incorrect behavior in scratchpad notes handling.

High
Prevent potential NullPointerException

Add null check for memory before calling memory.getId(). If memory is null when the
condition is true, this will throw a NullPointerException. The condition allows
inputMessages to be null for AG_UI agents, but doesn't verify memory is non-null.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentExecutor.java [552-569]

 if ((inputMessages != null && supportsStructuredMessages(mlAgent)) || (agentType == MLAgentType.AG_UI && inputMessages == null)) {
     executeAgent(
         inputDataSet,
         tenantId,
         mlTask,
         isAsync,
-        memory.getId(),
+        memory != null ? memory.getId() : null,
         mlAgent,
         outputs,
         modelTensors,
         listener,
         memory,
         channel,
         hookRegistry,
         inputMessages
     );
     return;
 }
Suggestion importance[1-10]: 9

__

Why: The condition allows memory to potentially be null (especially for AG_UI agents with inputMessages == null), but then calls memory.getId() without a null check, which would cause a NullPointerException at runtime.

High
Prevent duplicate listener callback invocation

The streamActionListener.onResponse() is called twice in the same code path: once to
save the assistant message and again in sendCompletionResponse(). This will cause
the listener to be invoked multiple times, which violates the ActionListener
contract and may lead to unexpected behavior or errors.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/HttpStreamingHandler.java [236-260]

 private void handleDoneEvent() {
     if (!agentExecutionInProgress) {
         if (isAGUIAgent && !isStreamClosed.get()) {
             ...
             // Trigger agentListener callback to save assistant structured message
             streamActionListener.onResponse(createFinalAnswerResponse(accumulatedContent.toString()));
+            return; // Exit early to avoid double callback
         }
 
         sendCompletionResponse(isStreamClosed, streamActionListener);
     }
 }
Suggestion importance[1-10]: 9

__

Why: This correctly identifies a critical bug where streamActionListener.onResponse() is called twice, violating the ActionListener contract. The suggested fix with an early return is correct and prevents potential errors or unexpected behavior.

High
Fix concurrent listener invocation

The error handling has a race condition where multiple failures could call
listener.onFailure() multiple times. Once hasError is set to true, the first thread
to reach remaining == 0 calls onFailure(e), but subsequent failures might also call
it. Ensure the listener is invoked exactly once.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/AgenticConversationMemory.java [499-559]

 private void doSaveMessages(List<Message> messages, int startId, ActionListener<Void> listener) {
     if (messages == null || messages.isEmpty()) {
         listener.onResponse(null);
         return;
     }
 
     AtomicInteger remaining = new AtomicInteger(messages.size());
-    AtomicBoolean hasError = new AtomicBoolean(false);
+    AtomicReference<Exception> firstError = new AtomicReference<>();
 
     for (int i = 0; i < messages.size(); i++) {
         ...
         client.execute(MLAddMemoriesAction.INSTANCE, request, ActionListener.wrap(response -> {
             ...
             if (remaining.decrementAndGet() == 0) {
-                if (hasError.get()) {
-                    listener.onFailure(new RuntimeException("One or more structured messages failed to save"));
+                Exception error = firstError.get();
+                if (error != null) {
+                    listener.onFailure(error);
                 } else {
                     nextMessageId.set(startId + messages.size());
                     listener.onResponse(null);
                 }
             }
         }, e -> {
             ...
-            hasError.set(true);
+            firstError.compareAndSet(null, e);
             if (remaining.decrementAndGet() == 0) {
-                listener.onFailure(e);
+                listener.onFailure(firstError.get());
             }
         }));
     }
 }
Suggestion importance[1-10]: 8

__

Why: Race condition in error handling could result in listener.onFailure() being called multiple times. Using AtomicReference with compareAndSet ensures the listener is invoked exactly once, preventing potential issues in async callback handling.

Medium
Prevent race conditions in ID assignment

The nextMessageId atomic reference is set to null and then used to determine the
starting ID. If multiple concurrent calls occur, the second call will always query
for max ID even if the first call is still in progress, potentially causing
duplicate message IDs or race conditions in ID assignment.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java [563-571]

-Integer startId = this.nextMessageId.getAndSet(null);
-if (startId != null) {
-    doSaveMessages(messages, startId, listener);
-} else {
-    getMaxStructuredMessageId(
-        ActionListener.wrap(maxId -> { doSaveMessages(messages, maxId + 1, listener); }, listener::onFailure)
-    );
+synchronized (this.nextMessageId) {
+    Integer startId = this.nextMessageId.get();
+    if (startId != null) {
+        this.nextMessageId.set(startId + messages.size());
+        doSaveMessages(messages, startId, listener);
+    } else {
+        getMaxStructuredMessageId(
+            ActionListener.wrap(maxId -> { 
+                synchronized (this.nextMessageId) {
+                    this.nextMessageId.set(maxId + 1 + messages.size());
+                }
+                doSaveMessages(messages, maxId + 1, listener); 
+            }, listener::onFailure)
+        );
+    }
 }
Suggestion importance[1-10]: 8

__

Why: This identifies a critical race condition where concurrent calls could result in duplicate message IDs. The suggested fix using synchronization is appropriate, though the implementation could be refined to use atomic operations more effectively. This is a significant correctness issue.

Medium
Add synchronization for concurrent access

The lastIncompleteInteractionId is an instance variable that can be accessed
concurrently by multiple threads. Without proper synchronization, race conditions
may occur when multiple requests attempt to save trailing user messages
simultaneously, potentially causing data corruption or lost updates.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java [260-283]

 private void saveTrailingUserMessage(Message userMessage, ActionListener<Void> listener) {
     String userText = AgentUtils.extractTextFromMessage(userMessage);
     if (userText == null || userText.isEmpty()) {
         listener.onResponse(null);
         return;
     }
 
     ConversationIndexMessage msg = ConversationIndexMessage
         .conversationIndexMessageBuilder()
         .question(userText)
         .response("")
         .finalAnswer(true)
         .sessionId(conversationId)
         .build();
 
     save(msg, null, null, null, ActionListener.wrap(interaction -> {
         log.info("Saved trailing user message as incomplete interaction: {}", interaction.getId());
-        this.lastIncompleteInteractionId.set(interaction.getId());
+        synchronized (this.lastIncompleteInteractionId) {
+            this.lastIncompleteInteractionId.set(interaction.getId());
+        }
         listener.onResponse(null);
     }, e -> {
         log.error("Failed to save trailing user message", e);
         listener.onFailure(e);
     }));
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition with lastIncompleteInteractionId. However, synchronizing on an AtomicReference is not the correct approach. The proper fix would be to use atomic operations like compareAndSet or ensure the entire operation is atomic. The issue is valid but the solution needs refinement.

Medium
Add null check for sessionId

The sessionId variable may be null when VersionConflictEngineException occurs
without an explicit session ID. This can happen if the conflict arises from internal
ID generation. Add a null check before using sessionId in the response builder to
prevent potential NullPointerException.

plugin/src/main/java/org/opensearch/ml/action/session/TransportCreateSessionAction.java [140-143]

 if (e instanceof VersionConflictEngineException) {
     // Session already exists — return the existing session ID
-    MLCreateSessionResponse response = MLCreateSessionResponse.builder().sessionId(sessionId).status("exists").build();
-    actionListener.onResponse(response);
+    if (sessionId != null) {
+        MLCreateSessionResponse response = MLCreateSessionResponse.builder().sessionId(sessionId).status("exists").build();
+        actionListener.onResponse(response);
+    } else {
+        log.error("Version conflict occurred but sessionId is null for container {}", input.getMemoryContainerId());
+        actionListener.onFailure(new OpenSearchStatusException("Internal server error", RestStatus.INTERNAL_SERVER_ERROR));
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that sessionId could be null when a VersionConflictEngineException occurs. However, looking at the code context, sessionId is only used in the response when the exception occurs, and the exception itself is triggered by the CREATE operation on line 130, which only happens when sessionId is not null (checked on line 128). While the null check adds defensive programming, the scenario is unlikely given the current flow.

Medium
General
Preserve interaction ID on update failure

When updating an incomplete interaction fails, the lastIncompleteInteractionId has
already been cleared by getAndSet(null). This means the incomplete interaction ID is
permanently lost and cannot be retried, leaving orphaned incomplete interactions in
the database.

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java [212-226]

-String pendingInteractionId = this.lastIncompleteInteractionId.getAndSet(null);
+String pendingInteractionId = this.lastIncompleteInteractionId.get();
 if (pendingInteractionId != null) {
     String assistantText = extractAssistantText(filteredMessages);
     if (assistantText != null && !assistantText.isEmpty()) {
         String interactionId = pendingInteractionId;
         update(interactionId, Map.of(AI_RESPONSE_FIELD, assistantText), ActionListener.wrap(updateResponse -> {
             log.info("Updated incomplete interaction {} with assistant response", interactionId);
+            this.lastIncompleteInteractionId.compareAndSet(interactionId, null);
             listener.onResponse(null);
         }, e -> {
             log.error("Failed to update incomplete interaction {} with assistant response", interactionId, e);
             listener.onFailure(e);
         }));
         return;
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion identifies a valid issue where the interaction ID is lost on update failure. The proposed fix using compareAndSet is better than the original getAndSet(null), though it doesn't fully address retry logic. This is a moderate correctness issue affecting data integrity.

Medium
Memory parameter is silently ignored

The default implementation ignores the memory parameter and calls the overload
without it. This defeats the purpose of passing executor-provided memory. Consider
making this method abstract or documenting that implementations must override this
method to properly handle the memory parameter.

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLAgentRunner.java [49-57]

-default void run(
+void run(
     MLAgent mlAgent,
     Map<String, String> params,
     ActionListener<Object> listener,
     TransportChannel channel,
     Memory memory
-) {
-    run(mlAgent, params, listener, channel);
-}
+);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly points out that the default implementation ignores the memory parameter. However, making it abstract would break backward compatibility for existing implementations. The current design allows gradual adoption where implementations can override when ready. The suggestion to document this behavior would be more appropriate than making it abstract, but the concern about the parameter being ignored is valid.

Low

@jiapingzeng
Copy link
Copy Markdown
Contributor Author

With the latest changes, all memory types are supported for unified interface agent. However, trace is disabled for unified interface agent and parentInteractionId is not returned in the response. Created #4677 to track.

Copy link
Copy Markdown
Collaborator

@pyek-bot pyek-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving PR to unblock. Most changes look good. I will handle the gap when building the v2 changes for unified interface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants