Conditionals with the pipeline processor
The pipeline
processor in ingest pipelines allows conditional execution of different sub-pipelines based on document contents. This provides powerful flexibility when different types of documents require separate processing logic. You can use the if
parameter in the pipeline
processor to direct documents to different pipelines based on field values, data types, or content structure. Each pipeline can then apply its own set of processors independently. This approach keeps your pipelines modular and maintainable by applying logic only where it’s relevant.
Example: Route logs by service
The following example demonstrates how to route logs to different sub-pipelines depending on the service.name
field in the document.
Create the first pipeline named webapp_logs
:
PUT _ingest/pipeline/webapp_logs
{
"processors": [
{ "set": { "field": "log_type", "value": "webapp" } }
]
}
Create the second pipeline named api_logs
:
PUT _ingest/pipeline/api_logs
{
"processors": [
{ "set": { "field": "log_type", "value": "api" } }
]
}
Create the main routing pipeline named service_router
, which routes the documents to the corresponding pipelines based on service.name
:
PUT _ingest/pipeline/service_router
{
"processors": [
{
"pipeline": {
"name": "webapp_logs",
"if": "ctx.service?.name == 'webapp'"
}
},
{
"pipeline": {
"name": "api_logs",
"if": "ctx.service?.name == 'api'"
}
}
]
}
Use the following request to simulate the pipelines:
POST _ingest/pipeline/service_router/_simulate
{
"docs": [
{ "_source": { "service": { "name": "webapp" }, "message": "Homepage loaded" } },
{ "_source": { "service": { "name": "api" }, "message": "GET /v1/users" } },
{ "_source": { "service": { "name": "worker" }, "message": "Task started" } }
]
}
The response confirms that the first document was processed by the webapp_logs
pipeline and the second document was processed by the api_logs
pipeline. The third document remains unchanged because it doesn’t match any conditions:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"log_type": "webapp",
"message": "Homepage loaded",
"service": {
"name": "webapp"
}
},
"_ingest": {
"timestamp": "2025-04-24T10:54:12.555447087Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"log_type": "api",
"message": "GET /v1/users",
"service": {
"name": "api"
}
},
"_ingest": {
"timestamp": "2025-04-24T10:54:12.55548442Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"message": "Task started",
"service": {
"name": "worker"
}
},
"_ingest": {
"timestamp": "2025-04-24T10:54:12.555490754Z"
}
}
}
]
}
Example: Type-specific processing
You can also use the pipeline processor to apply type-specific pipelines. The following pipeline directs logs to a numeric_handler
if the code
field is a number and to a string_handler
if it is of type String
.
Create the first pipeline named numeric_handler
:
PUT _ingest/pipeline/numeric_handler
{
"processors": [
{ "set": { "field": "code_type", "value": "numeric" } }
]
}
Create the second pipeline named string_handler
:
PUT _ingest/pipeline/string_handler
{
"processors": [
{ "set": { "field": "code_type", "value": "string" } }
]
}
Create the main routing pipeline named type_router
, which routes the documents to the corresponding pipelines based on the code
field:
PUT _ingest/pipeline/type_router
{
"processors": [
{
"pipeline": {
"name": "numeric_handler",
"if": "ctx.code instanceof Integer || ctx.code instanceof Long || ctx.code instanceof Double"
}
},
{
"pipeline": {
"name": "string_handler",
"if": "ctx.code instanceof String"
}
}
]
}
Use the following request to simulate the pipelines:
POST _ingest/pipeline/type_router/_simulate
{
"docs": [
{ "_source": { "code": 404 } },
{ "_source": { "code": "ERR_NOT_FOUND" } }
]
}
The returned documents have the new field code_type
added by individual sub-pipelines:
{
"docs": [
{
"doc": {
"_source": {
"code": 404,
"code_type": "numeric"
}
}
},
{
"doc": {
"_source": {
"code": "ERR_NOT_FOUND",
"code_type": "string"
}
}
}
]
}