Skip to content

Introduce gRPC streaming support for ML prediction and agent execution#4790

Draft
nathaliellenaa wants to merge 4 commits intoopensearch-project:mainfrom
nathaliellenaa:grpc_streaming
Draft

Introduce gRPC streaming support for ML prediction and agent execution#4790
nathaliellenaa wants to merge 4 commits intoopensearch-project:mainfrom
nathaliellenaa:grpc_streaming

Conversation

@nathaliellenaa
Copy link
Copy Markdown
Contributor

@nathaliellenaa nathaliellenaa commented Apr 13, 2026

Description

Introduce gRPC streaming support for ML prediction and agent execution.
Supported model:

  • OpenAI Chat Completion
  • Amazon Bedrock Converse Stream

Module Structure

New grpc/ module:

 grpc/                                                                                                                                                                                                            
  ├── src/main/java/org/opensearch/ml/grpc/                                                                                                                                                                        
  │   ├── MLStreamingService.java          # Main gRPC service implementation                                                                                                                                      
  │   ├── MLGrpcServiceFactory.java        # SPI factory for service registration                                                                                                                                  
  │   ├── GrpcStatusMapper.java            # Maps exceptions to gRPC status codes                                                                                                                                  
  │   ├── TenantIdInterceptor.java         # Multi-tenancy support via metadata                                                                                                                              
  │   ├── adapters/                                                                                                                                                                                                
  │   │   ├── StreamObserverAdapter.java   # Bridges gRPC StreamObserver with ML transport                                                                                                                         
  │   │   └── GrpcTransportChannel.java    # gRPC streaming channel implementation                                                                                                                             
  │   ├── converters/                                                                                                                                                                                              
  │   │   ├── ProtoRequestConverter.java   # Converts proto requests to ML requests                                                                                                                                
  │   │   └── ProtoResponseConverter.java  # Converts ML responses to proto format                                                                                                                                 
  │   └── interfaces/                      # Abstraction interfaces to avoid circular deps                                                                                                                         
  │       ├── MLClient.java                                                                                                                                                                                        
  │       ├── MLModelManager.java                                                                                                                                                                                  
  │       ├── MLModelAccessControlHelper.java                                                                                                                                                                      
  │       ├── MLTaskRunner.java                                                                                                                                                                                    
  │       └── MLUserContextProvider.java                                                                                                                                                                           
  └── src/main/resources/META-INF/services/                                                                                                                                                                        
      └── org.opensearch.transport.grpc.spi.GrpcServiceFactory      

Adapter classes in plugin/ (implement grpc interfaces):

plugin/src/main/java/org/opensearch/ml/plugin/grpc/                                                                                                                                                              
  ├── ClientAdapter.java                    # Wraps ML Client                                                                                                                                                      
  ├── ModelManagerAdapter.java              # Wraps MLModelManager                                                                                                                                                 
  ├── TaskRunnerAdapter.java                # Wraps MLPredictTaskRunner/MLExecuteTaskRunner                                                                                                                        
  ├── ModelAccessControlHelperAdapter.java  # Wraps access control                                                                                                                                                 
  └── UserContextProviderAdapter.java       # Wraps security context

Sample Request/Response

StreamingPredict Request:

Example gRPC call (using grpcurl):

// OpenAI chat completion model
grpcurl -plaintext -d '{
  "model_id": "dHiMdJ0B9Pl_xFeF_p3m",
  "ml_predict_model_stream_request_body": {
    "parameters": {
      "messages": [
        {
          "role": "system",
          "content": "You are a helpful assistant."
        },
        {
          "role": "user",
          "content": "Hello!"
        }
      ],
      "_llm_interface": "openai/v1/chat/completions"
    }
  }
}' localhost:9400 org.opensearch.protobufs.services.MLService/PredictModelStream 

// Bedrock Converse stream
grpcurl -plaintext -d '{
  "model_id": "ZK0_eZ0BQ8o9xp0YiJjb",
  "ml_predict_model_stream_request_body": {
    "parameters": {
      "inputs": "Hello!",
      "_llm_interface": "bedrock/converse/claude"
    }
  }
}' localhost:9400 org.opensearch.protobufs.services.MLService/PredictModelStream 

Streaming Response (multiple chunks):

{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "content": "Hello",
            "isLast": false
          }
        }
      ]
    }
  ]
}
{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "content": "!",
            "isLast": false
          }
        }
      ]
    }
  ]
}
{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "content": " How",
            "isLast": false
          }
        }
      ]
    }
  ]
}
...
{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "content": "",
            "isLast": true
          }
        }
      ]
    }
  ]

StreamingExecuteAgent Request:

Example gRPC call (using grpcurl):

// Conversational agent with OpenAI chat completion/Bedrock converse stream model
grpcurl -plaintext -d '{
  "agent_id": "Xq0-eZ0BQ8o9xp0YnpjK",
  "ml_execute_agent_stream_request_body": {
    "parameters": {
      "question": "how many indices in my cluster?"
    }
  }
}' localhost:9400 org.opensearch.protobufs.services.MLService/ExecuteAgentStream 

Streaming Response (multiple chunks):

{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "memory_id",
          "result": "IJaPiJ0BAlAOubg3ZHCF"
        },
        {
          "name": "parent_interaction_id",
          "result": "IZaPiJ0BAlAOubg3ZHDB"
        },
        {
          "name": "response",
          "dataAsMap": {
            "content": "[{\"index\":0.0,\"id\":\"call_uUWGdLd9Osi3wjWMAqF8a3cn\",\"type\":\"function\",\"function\":{\"name\":\"RetrieveIndexMetaTool\",\"arguments\":\"\"}}]",
            "isLast": false
          }
        }
      ]
    }
  ]
}
{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "memory_id",
          "result": "IJaPiJ0BAlAOubg3ZHCF"
        },
        {
          "name": "parent_interaction_id",
          "result": "IZaPiJ0BAlAOubg3ZHDB"
        },
        {
          "name": "response",
          "dataAsMap": {
            "content": "[{\"index\":0.0,\"function\":{\"arguments\":\"{}\"}}]",
            "isLast": false
          }
        }
      ]
    }
  ]
}
...
{
  "inferenceResults": [
    {
      "output": [
        {
          "name": "memory_id",
          "result": "IJaPiJ0BAlAOubg3ZHCF"
        },
        {
          "name": "parent_interaction_id",
          "result": "IZaPiJ0BAlAOubg3ZHDB"
        },
        {
          "name": "response",
          "dataAsMap": {
            "content": "",
            "isLast": true
          }
        }
      ]
    }
  ]
}

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 13, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 3604222.

PathLineSeverityDescription
grpc/build.gradle9highNew build plugin added: 'io.freefair.lombok'. Build plugins execute arbitrary code during compilation and must be verified for authenticity before use.
grpc/build.gradle20highNew external dependencies added: 'io.grpc:grpc-api' and 'io.grpc:grpc-stub'. Versions are resolved via a variable (${versions.grpc}); artifact authenticity and version pinning must be verified.
grpc/build.gradle24highNew external dependencies added: 'org.opensearch.plugin:transport-grpc-spi' and 'org.opensearch:protobufs'. These are new packages that must be verified against official OpenSearch artifact registries.
grpc/build.gradle27highNew test dependency added: 'org.mockito:mockito-core:5.15.2' with a hardcoded version. Dependency authenticity and integrity should be verified.
plugin/build.gradle79highExtended plugins list modified to include 'transport-grpc'. This grants the ML plugin access to gRPC transport internals and must be verified as an intentional, authorized extension point.
plugin/build.gradle117highNew compileOnly dependency added: 'org.opensearch.plugin:transport-grpc-spi'. New external package dependency requiring verification of artifact authenticity and source.
ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/agent/MLChatAgentRunner.java1386mediumNew 'skipListenerCallback' parameter (passed as 'isStreaming') suppresses the call to listener.onResponse() in returnFinalResponse() when streaming is active. This changes response delivery semantics and could suppress final response data reaching certain callers; warrants review to confirm all code paths correctly receive responses.
ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/streaming/StreamPredictActionListener.java77mediumonResponse() behavior changed: previously always called onStreamResponse(), now only calls it when agentListener is null. When agentListener is set, streaming chunks are no longer forwarded via onStreamResponse(). This is a subtle behavioral inversion that could silently drop streaming data in agent-listener code paths if the agent listener does not independently handle streaming.

The table above displays the top 10 most important findings.

Total: 8 | Critical: 0 | High: 6 | Medium: 2 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval April 13, 2026 20:42 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval April 13, 2026 20:42 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval April 13, 2026 20:42 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval April 13, 2026 20:42 — with GitHub Actions Failure
Signed-off-by: Nathalie Jonathan <nathhjo@amazon.com>
Signed-off-by: Nathalie Jonathan <nathhjo@amazon.com>
Signed-off-by: Nathalie Jonathan <nathhjo@amazon.com>
Signed-off-by: Nathalie Jonathan <nathhjo@amazon.com>
@nathaliellenaa nathaliellenaa requested a deployment to ml-commons-cicd-env-require-approval April 15, 2026 00:07 — with GitHub Actions Waiting
@nathaliellenaa nathaliellenaa requested a deployment to ml-commons-cicd-env-require-approval April 15, 2026 00:07 — with GitHub Actions Waiting
@nathaliellenaa nathaliellenaa requested a deployment to ml-commons-cicd-env-require-approval April 15, 2026 00:07 — with GitHub Actions Waiting
@nathaliellenaa nathaliellenaa requested a deployment to ml-commons-cicd-env-require-approval April 15, 2026 00:07 — with GitHub Actions Waiting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant