Link Search Menu Expand Document Documentation Menu

Predict Stream API

Introduced 3.3

This is an experimental feature and is not recommended for use in a production environment. For updates on the progress of the feature or if you want to leave feedback, join the discussion on the OpenSearch forum.

The Predict Stream API provides the same functionality as the Predict API but returns responses in a streaming format, delivering data in chunks as it becomes available. This streaming approach is particularly beneficial for large language model interactions with lengthy responses, allowing you to see partial results immediately rather than waiting for the complete response.

This API currently supports the following remote model types:

Endpoint

POST /_plugins/_ml/models/<model_id>/_predict/stream

Prerequisites

Before using this API, ensure that you have fulfilled the following prerequisites.

Set up your cluster

Follow these steps to set up your cluster.

Step 1: Install the required plugins

The Execute Stream Agent API depends on the following plugins, which are included in the OpenSearch distribution but must be explicitly installed as follows:

bin/opensearch-plugin install transport-reactor-netty4
bin/opensearch-plugin install arrow-flight-rpc

For more information, see Installing plugins.

Step 2: Configure OpenSearch settings

Add these settings to your opensearch.yml file or Docker Compose configuration:

opensearch.experimental.feature.transport.stream.enabled: true

# Choose one based on your security settings
http.type: reactor-netty4        # security disabled
http.type: reactor-netty4-secure # security enabled

# Multi-node cluster settings (if applicable)
# Use network.host IP for opensearch.yml or node name for Docker
arrow.flight.publish_host: <ip>
arrow.flight.bind_host: <ip>

# Security-enabled cluster settings (if applicable)
transport.stream.type.default: FLIGHT-SECURE
flight.ssl.enable: true
transport.ssl.enforce_hostname_verification: false

If you’re using the security demo certificates, change plugins.security.ssl.transport.enforce_hostname_verification: false to transport.ssl.enforce_hostname_verification: false in your opensearch.yml file.

For more information about enabling experimental features, see Experimental feature flags.

Step 3: Configure JVM options

Add these settings to your jvm.options file:

-Dio.netty.allocator.numDirectArenas=1
-Dio.netty.noUnsafe=false
-Dio.netty.tryUnsafe=true
-Dio.netty.tryReflectionSetAccessible=true
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

Configure the necessary APIs

Configure the API using the following steps.

Step 1: Enable the streaming feature flag

To enable the streaming feature flag, update the cluster settings as follows:

PUT _cluster/settings
{
  "persistent" : {
    "plugins.ml_commons.stream_enabled": true
  }
}

Step 2: Register a compatible externally hosted model

To register an OpenAI Chat Completion model, send the following request:

POST /_plugins/_ml/models/_register
{
    "name": "OpenAI gpt 3.5 turbo",
    "function_name": "remote",
    "description": "OpenAI model",
    "connector": {
        "name": "OpenAI Chat Connector",
        "description": "The connector to OpenAI model service for GPT 3.5",
        "version": 1,
        "protocol": "http",
        "parameters": {
            "endpoint": "api.openai.com",
            "model": "gpt-3.5-turbo"
        },
        "credential": {
            "openAI_key": "<your_api_key>"
        },
        "actions": [
            {
                "action_type": "predict",
                "method": "POST",
                "url": "https://${parameters.endpoint}/v1/chat/completions",
                "headers": {
                    "Authorization": "Bearer ${credential.openAI_key}"
                },
                "request_body": "{ \"model\": \"${parameters.model}\", \"messages\": ${parameters.messages} }",
                "response_filter": "$.choices[0].delta.content"
            }
        ]
    }
}

To register an Amazon Bedrock Converse Stream model, send the following request:

POST /_plugins/_ml/models/_register
{
    "name": "Amazon Bedrock Converse Stream model",
    "function_name": "remote",
    "description": "Amazon Bedrock Claude model",
    "connector": {
        "name": "Amazon Bedrock Converse",
        "description": "The connector to Amazon Bedrock Converse",
        "version": 1,
        "protocol": "aws_sigv4",
        "credential": {
            "access_key": "<your_aws_access_key>",
            "secret_key": "<your_aws_secret_key>",
            "session_token": "<your_aws_session_token>"
        },
        "parameters": {
            "region": "<your_aws_region>",
            "service_name": "bedrock",
            "response_filter": "$.output.message.content[0].text",
            "model": "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
        },
        "actions": [
            {
                "action_type": "predict",
                "method": "POST",
                "headers": {
                    "content-type": "application/json"
                },
                "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/converse",
                "request_body": "{\"messages\":[{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"${parameters.inputs}\"}]}]}"
            }
        ]
    }
}

Example request

To use the Predict Stream API, you must include the _llm_interface parameter that corresponds to your model type:

  • OpenAI Chat Completion: openai/v1/chat/completions
  • Amazon Bedrock Converse Stream: bedrock/converse/claude

For OpenAI Chat Completion, send the following request:

POST /_plugins/_ml/models/<model_id>/_predict/stream
{
  "parameters": {
    "messages": [
      {
        "role": "system",
        "content": "You are a helpful assistant."
      },
      {
        "role": "user",
        "content": "Can you summarize Prince Hamlet of William Shakespeare in around 1000 words?"
      }
    ],
    "_llm_interface": "openai/v1/chat/completions"
  }
}

For Amazon Bedrock Converse Stream, send the following request:

POST /_plugins/_ml/models/<model_id>/_predict/stream
{
  "parameters": {
    "inputs": "Can you summarize Prince Hamlet of William Shakespeare in around 1000 words?",
    "_llm_interface": "bedrock/converse/claude"
  }
}

Example response

The streaming format uses Server-Sent Events (SSE), with each chunk containing a portion of the model’s response, and an is_last flag to indicate completion.

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"Sure","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"!","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"Ham","is_last":false}}]}]}

...

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":" psyche","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":".","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"","is_last":true}}]}]}

Response body fields

The following table lists all response body fields.

Field Data type Description
inference_results Array Contains the streaming response data returned by the model.
inference_results.output Array Contains output objects for each inference result.
inference_results.output.name String The name of the output field (typically, response).
inference_results.output.dataAsMap Object Contains the response content and metadata.
inference_results.output.dataAsMap.content String The text content chunk from the model’s response.
inference_results.output.dataAsMap.is_last Boolean Indicates whether this is the final chunk in the stream: true for the last chunk, false if there are more chunks.
350 characters left

Have a question? .

Want to contribute? or .