Processor chains
Introduced 3.3
Processor chains enable flexible data transformation pipelines that can process both input and output data. Chain multiple processors together to create sequential transformations where each processor’s output becomes the next processor’s input.
Processors provide a way to:
- Transform data formats: Convert between different data structures (strings, JSON, arrays).
- Extract specific information: Use JSONPath or regex patterns to extract relevant data.
- Clean and filter content: Remove unwanted fields or apply formatting rules.
- Standardize data: Ensure consistent data formats across different components.
Processors execute in the order in which they appear in the array. Each processor receives the output from the previous processor.
Processor chains are specifically designed for ML workflows and differ from processors in ingest and search pipelines:
- Ingest pipelines: Transform documents during indexing into OpenSearch.
- Search pipelines: Transform queries and search results during search operations.
- Processor chains: Transform data within ML Commons workflows (agent tools, model inputs/outputs).
Processor chains provide specialized data transformation capabilities tailored for AI/ML use cases, such as cleaning model responses, extracting structured data from LLM outputs, and preparing inputs for model inference.
Configuration
Processors can be configured in different contexts:
- Tool outputs: Add an
output_processors
array in the tool’sparameters
section. - Model outputs: Add an
output_processors
array in the model’sparameters
section during a_predict
call. - Model inputs: Add an
input_processors
array in the model’sparameters
section of a_predict
call.
For complete examples, see Example usage with agents and Example usage with models.
Supported processor types
The following table lists all supported processors.
Processor | Description |
---|---|
conditional | Applies different processor chains based on conditions. |
extract_json | Extracts JSON objects or arrays from text strings. |
for_each | Iterates through array elements and applies a chain of processors to each element. |
jsonpath_filter | Extracts data using JSONPath expressions. |
process_and_set | Applies a chain of processors to the input and sets the result at a specified JSONPath location. |
regex_capture | Captures specific groups from regex matches. |
regex_replace | Replaces text using regular expression patterns. |
remove_jsonpath | Removes fields from JSON objects using JSONPath. |
set_field | Sets a field to a specified static value or copies a value from another field. |
to_string | Converts the input to a JSON string representation. |
conditional
Applies different processor chains based on conditions.
Parameters:
path
(string, optional): The JSONPath expression used to extract the value for condition evaluation.routes
(array, required): An array of condition-processor mappings.default
(array, optional): The default processors if no conditions match.
Supported conditions:
- Exact value match:
"value"
- Numeric comparisons:
">10"
,"<5"
,">="
,"<="
,"==5"
- Existence checks:
"exists"
,"null"
,"not_exists"
- Regex matching:
"regex:pattern"
- Contains text:
"contains:substring"
Example configuration:
{
"type": "conditional",
"path": "$.status",
"routes": [
{
"green": [
{"type": "regex_replace", "pattern": "status", "replacement": "healthy"}
]
},
{
"red": [
{"type": "regex_replace", "pattern": "status", "replacement": "unhealthy"}
]
}
],
"default": [
{"type": "regex_replace", "pattern": "status", "replacement": "unknown"}
]
}
Example input:
{"index": "test-index", "status": "green", "docs": 100}
Example output:
{"index": "test-index", "healthy": "green", "docs": 100}
extract_json
Extracts JSON objects or arrays from text strings.
Parameters:
extract_type
(string, optional): The type of JSON to extract:"object"
,"array"
, or"auto"
. Default is"auto"
.default
(any, optional): The default value if JSON extraction fails.
Example configuration:
{
"type": "extract_json",
"extract_type": "object",
"default": {}
}
Example input:
"The result is: {\"status\": \"success\", \"count\": 5} - processing complete"
Example output:
{"status": "success", "count": 5}
for_each
Iterates through array elements and applies a chain of processors to each element. Useful for transforming array elements uniformly, such as when adding missing fields, filtering content, or normalizing data structures.
Parameters:
path
(string, required): The JSONPath expression pointing to the array to iterate over. Must use[*]
notation for array elements.processors
(array, required): A list of processor configurations to apply to each array element.
Behavior:
- Each element is processed independently using the configured processor chain.
- The output of the processor chain replaces the original element.
- If the path doesn’t exist or doesn’t point to an array, the input is returned unchanged.
- If the processing of an element fails, the original element is kept.
Example configuration:
{
"type": "for_each",
"path": "$.items[*]",
"processors": [
{
"type": "set_field",
"path": "$.processed",
"value": true
}
]
}
Example input:
{
"items": [
{"name": "item1", "value": 10},
{"name": "item2", "value": 20}
]
}
Example output:
{
"items": [
{"name": "item1", "value": 10, "processed": true},
{"name": "item2", "value": 20, "processed": true}
]
}
jsonpath_filter
Extracts data using JSONPath expressions.
Parameters:
path
(string, required): The JSONPath expression used to extract data.default
(any, optional): The default value if the path is not found.
Example configuration:
{
"type": "jsonpath_filter",
"path": "$.data.items[*].name",
"default": []
}
Example input:
{"data": {"items": [{"name": "item1"}, {"name": "item2"}]}}
Example output:
["item1", "item2"]
process_and_set
Applies a chain of processors to the input and sets the result at a specified JSONPath location.
Parameters:
path
(string, required): The JSONPath expression specifying where to set the processed result.processors
(array, required): A list of processor configurations to apply sequentially.
Path behavior:
- If the path exists, it will be updated with the processed value.
- If the path doesn’t exist, the processor chain attempts to create it (works for simple nested fields).
- A parent path must exist for new field creation to succeed.
Example configuration:
{
"type": "process_and_set",
"path": "$.summary.clean_name",
"processors": [
{
"type": "to_string"
},
{
"type": "regex_replace",
"pattern": "[^a-zA-Z0-9]",
"replacement": "_"
}
]
}
Example input:
{"name": "Test Index!", "status": "active"}
Example output:
{"name": "Test Index!", "status": "active", "summary": {"clean_name": "Test_Index_"}}
regex_capture
Captures specific groups from regex matches. For regex syntax details, see Java regex syntax.
Parameters:
pattern
(string, required): A regular expression pattern with capture groups.groups
(string or array, optional): Group numbers to capture. Can be a single number like"1"
or array like"[1, 2, 4]"
. Default is"1"
.
Example configuration:
{
"type": "regex_capture",
"pattern": "(\\d+),(\\w+),(\\w+),([^,]+)",
"groups": "[1, 4]"
}
Example input:
"1,green,open,.plugins-ml-model-group,DCJHJc7pQ6Gid02PaSeXBQ,1,0"
Example output:
["1", ".plugins-ml-model-group"]
regex_replace
Replaces text using regular expression patterns. For regex syntax details, see Java regex syntax.
Parameters:
pattern
(string, required): A regular expression pattern to match.replacement
(string, optional): Replacement text. Default is""
.replace_all
(Boolean, optional): Whether to replace all matches or only the first. Default istrue
.
Example configuration:
{
"type": "regex_replace",
"pattern": "^.*?\n",
"replacement": ""
}
Example input:
"row,health,status,index\n1,green,open,.plugins-ml-model\n2,red,closed,test-index"
Example output:
"1,green,open,.plugins-ml-model\n2,red,closed,test-index"
remove_jsonpath
Removes fields from JSON objects using JSONPath.
Parameters:
paths
(array, required): An array of JSONPath expressions identifying fields to remove.
Example configuration:
{
"type": "remove_jsonpath",
"paths": "[$.sensitive_data]"
}
Example input:
{"name": "user1", "sensitive_data": "secret", "public_info": "visible"}
Example output:
{"name": "user1", "public_info": "visible"}
set_field
Sets a field to a specified static value or copies a value from another field.
Parameters:
path
(string, required): The JSONPath expression specifying where to set the value.value
(any, conditionally required): The static value to set. Eithervalue
orsource_path
must be provided.source_path
(string, conditionally required): The JSONPath expression to copy the value from. Eithervalue
orsource_path
must be provided.default
(any, optional): The default value whensource_path
doesn’t exist. Only used withsource_path
.
Path behavior:
- If the path exists, it will be updated with the new value.
- If the path doesn’t exist, the processor chain attempts to create it (works for simple nested fields).
- A parent path must exist for new field creation to succeed.
Example configuration (static value):
{
"type": "set_field",
"path": "$.metadata.processed_at",
"value": "2024-03-15T10:30:00Z"
}
Example configuration (copy field):
{
"type": "set_field",
"path": "$.userId",
"source_path": "$.user.id",
"default": "unknown"
}
Example input:
{"user": {"id": 123}, "name": "John"}
Example output:
{"user": {"id": 123}, "name": "John", "userId": 123, "metadata": {"processed_at": "2024-03-15T10:30:00Z"}}
to_string
Converts the input to a JSON string representation.
Parameters:
escape_json
(Boolean, optional): Whether to escape JSON characters. Default isfalse
.
Example configuration:
{
"type": "to_string",
"escape_json": true
}
Example input:
{"name": "test", "value": 123}
Example output:
"{\"name\":\"test\",\"value\":123}"
Example usage with agents
The following example demonstrates using processor chains with agents.
Step 1: Register a flow agent with output processors
POST /_plugins/_ml/agents/_register
{
"name": "Index Summary Agent",
"type": "flow",
"description": "Agent that provides clean index summaries",
"tools": [
{
"type": "ListIndexTool",
"parameters": {
"output_processors": [
{
"type": "regex_replace",
"pattern": "^.*?\n",
"replacement": ""
},
{
"type": "regex_capture",
"pattern": "(\\d+,\\w+,\\w+,([^,]+))"
}
]
}
}
]
}
Step 2: Execute the agent
Using the agent_id
returned in the previous step:
POST /_plugins/_ml/agents/{agent_id}/_execute
{
"parameters": {
"question": "List the indices"
}
}
Without output processors, the raw ListIndexTool
returns verbose CSV output with headers and extra columns:
row,health,status,index,uuid,pri,rep,docs.count,docs.deleted,store.size,pri.store.size
1,green,open,.plugins-ml-model-group,DCJHJc7pQ6Gid02PaSeXBQ,1,0,1,0,12.7kb,12.7kb
2,green,open,.plugins-ml-memory-message,6qVpepfRSCi9bQF_As_t2A,1,0,7,0,53kb,53kb
3,green,open,.plugins-ml-memory-meta,LqP3QMaURNKYDZ9p8dTq3Q,1,0,2,0,44.8kb,44.8kb
The output processors transform the verbose CSV output into a clean, readable format by:
regex_replace
: Removing the CSV header row.regex_capture
: Extracting only essential information (row number, health, status, and index name).
With output processors, the agent returns clean, formatted data with only essential index information:
1,green,open,.plugins-ml-model-group
2,green,open,.plugins-ml-memory-message
3,green,open,.plugins-ml-memory-meta
Example usage with models
The following examples demonstrate how to use processor chains with models during Predict API calls.
Example: Input processors
This example shows you how to modify model input using input_processors
to replace text before processing:
POST _plugins/_ml/models/{model_id}/_predict
{
"parameters": {
"system_prompt": "You are a helpful assistant.",
"prompt": "Can you summarize Prince Hamlet of William Shakespeare in around 100 words?",
"input_processors": [
{
"type": "regex_replace",
"pattern": "100",
"replacement": "20"
}
]
}
}
In this example, the regex_replace
processor modifies the prompt before it’s sent to the model, changing “100 words” to “20 words”.
Example: Output processors
This example shows you how to process model output using output_processors
to extract and format JSON data. In this example, the output processors first extract the content from the model response using JSONPath. Then they parse and extract the JSON object from the text response:
POST _plugins/_ml/models/{model_id}/_predict
{
"parameters": {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "${parameters.system_prompt}"
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": "Can you convert this into a json object: user name is Bob, he likes swimming"
}
]
}
],
"system_prompt": "You are a helpful assistant",
"output_processors": [
{
"type": "jsonpath_filter",
"path": "$.choices[0].message.content"
},
{
"type": "extract_json",
"extract_type": "auto"
}
]
}
}
Without output processors, the raw response contains the full model output with extensive metadata and a nested structure:
{
"inference_results": [
{
"output": [
{
"name": "response",
"dataAsMap": {
"id": "test-id",
"object": "chat.completion",
"created": 1.759580469E9,
"model": "gpt-4o-mini-2024-07-18",
"choices": [
{
"index": 0.0,
"message": {
"role": "assistant",
"content": "Sure! Here is the information you provided converted into a JSON object:\n\n```json\n{\n \"user\": {\n \"name\": \"Bob\",\n \"likes\": \"swimming\"\n }\n}\n```",
"refusal": null,
"annotations": []
},
"logprobs": null,
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 33.0,
"completion_tokens": 42.0,
"total_tokens": 75.0,
"prompt_tokens_details": {
"cached_tokens": 0.0,
"audio_tokens": 0.0
},
"completion_tokens_details": {
"reasoning_tokens": 0.0,
"audio_tokens": 0.0,
"accepted_prediction_tokens": 0.0,
"rejected_prediction_tokens": 0.0
}
},
"service_tier": "default",
"system_fingerprint": "test-fingerprint"
}
}
],
"status_code": 200
}
]
}
With output processors, the response is simplified to contain only the extracted and parsed JSON data:
{
"inference_results": [
{
"output": [
{
"name": "response",
"dataAsMap": {
"user": {
"name": "Bob",
"likes": "swimming"
}
}
}
],
"status_code": 200
}
]
}