Bulk API (gRPC)
Introduced 3.0
The gRPC Bulk API provides an efficient, binary-encoded alternative to the HTTP Bulk API for performing multiple document operations—such as indexing, updating, and deleting—in a single call. This service uses protocol buffers and mirrors the REST API in terms of parameters and structure.
Prerequisite
To submit gRPC requests, you must have a set of protobufs on the client side. For ways to obtain the protobufs, see Using gRPC APIs.
gRPC service and method
gRPC Document APIs reside in the DocumentService.
You can submit bulk requests by invoking the Bulk
gRPC method within the DocumentService
. The method takes a BulkRequest
and returns a BulkResponse
.
Document format
In gRPC, documents must be provided and returned as bytes. Use Base64 encoding to provide documents in a gRPC request.
For example, consider the following document in a regular Bulk API request:
"doc": "{\"title\": \"Inception\", \"year\": 2010}"
For a gRPC Bulk API request, provide the same document in Base64 encoding:
"doc": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"
BulkRequest fields
The BulkRequest
message is the top-level container for a gRPC bulk operation. It accepts the following fields.
Field | Protobuf type | Description |
---|---|---|
request_body | repeated BulkRequestBody | The list of bulk operations, each containing one of the operation types (index /create /update /delete ). Required. |
index | string | The default index for all operations unless overridden in request_body . Specifying the index in the BulkRequest means that you don’t need to include it in the BulkRequestBody. Optional. |
x_source | SourceConfigParam | Controls whether to return the full _source , no _source , or only specific fields from _source in the response. Optional. |
x_source_excludes | repeated string | Fields to exclude from source . Optional. |
x_source_includes | repeated string | Fields to include from source . Optional. |
pipeline | string | The preprocessing ingest pipeline ID. Optional. |
refresh | Refresh | Whether to refresh shards after indexing. Optional. |
require_alias | bool | If true , actions must target an alias. Optional. |
routing | string | The routing value for shard assignment. Optional. |
timeout | string | The timeout duration (for example, 1m ). Optional. |
type (Deprecated) | string | The document type (always _doc ). Optional. |
wait_for_active_shards | WaitForActiveShards | The minimum number of active shards to wait for. Optional. |
global_params | GlobalParams | Global parameters for the request. Optional. |
BulkRequestBody fields
The BulkRequestBody
message represents a single document-level operation within a BulkRequest
. It accepts the following fields.
Field | Protobuf type | Description |
---|---|---|
operation_container | OperationContainer | The operation to perform (index , create , update , or delete ). Required. |
update_action | UpdateAction | Additional update-specific options. Optional. |
object | bytes | The full document content used with create and index operations. Optional. |
OperationContainer fields
The OperationContainer
message contains exactly one operation type. It accepts the following fields.
Field | Protobuf type | Description |
---|---|---|
index | IndexOperation | Index a document. Replaces the document if it already exists. |
create | WriteOperation | Create a new document. Fails if the document already exists. |
update | UpdateOperation | Partially update a document or use upsert/script options. |
delete | DeleteOperation | Delete a document by ID. |
UpdateAction fields
The UpdateAction
message provides additional options for update operations. It accepts the following fields.
Field | Protobuf type | Description |
---|---|---|
detect_noop | bool | If true , skips the update if the document content hasn’t changed. Optional. Default is true . |
doc | bytes | Partial or full document data for update operations. Optional. |
doc_as_upsert | bool | If true , treats the document as the full upsert document if the target document doesn’t exist. Only valid for the update operation. Optional. |
script | Script | A script to apply to the document (used with update ). Optional. |
scripted_upsert | bool | If true , executes the script whether or not the document exists. Optional. |
upsert | bytes | The full document to use if the target does not exist. Used with script . Optional. |
x_source | SourceConfig | Controls how the document source is fetched or filtered. Optional. |
Create
WriteOperation
adds a new document only if it doesn’t already exist.
The document itself must be provided in the object
field of the BulkRequestBody
message.
The following optional fields can also be provided.
Field | Protobuf type | Description |
---|---|---|
x_id | string | The document ID. If omitted, one is auto-generated. Optional. |
x_index | string | The target index. Required if not set globally in the BulkRequest . Optional. |
routing | string | A custom routing value used to control shard placement. Optional. |
pipeline | string | The preprocessing ingest pipeline ID. Optional. |
require_alias | bool | If true , requires that all actions target an index alias rather than an index. Default is false . Optional. |
Example request
The following example shows a bulk request with a create
operation. It creates a document with the ID tt1375666
in the movies
index. The document content, provided in Base64 encoding, represents {"title": "Inception", "year": 2010}
:
{
"index": "movies",
"request_body": [
{
"operation_container": {
"create": {
"x_index": "movies",
"x_id": "tt1375666"
}
},
"object": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"
}
]
}
Delete
The DeleteOperation
removes a document by ID. It accepts the following fields.
Field | Protobuf type | Description |
---|---|---|
x_id | string | The ID of the document to delete. Required. |
x_index | string | The target index. Required if not set globally in the BulkRequest . Optional. |
routing | string | A custom routing value used to control shard placement. Optional. |
if_primary_term | int64 | Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional. |
if_seq_no | int64 | Used for concurrency control. The operation only runs if the document’s sequence number matches this value. Optional. |
version | int64 | The explicit document version for concurrency control. Optional. |
version_type | VersionType | Controls version matching behavior. Optional. |
Example request
The following example shows a bulk request with a delete
operation. It deletes a document with the ID tt1392214
from the movies
index:
{
"index": "movies",
"request_body": [
{
"operation_container": {
"delete": {
"x_index": "movies",
"x_id": "tt1392214"
}
}
}
]
}
Index
The IndexOperation
creates or overwrites a document. If an ID is not provided, one is generated.
The document itself is provided in the object
field of the BulkRequestBody
message.
The following optional fields can also be provided.
Field | Protobuf type | Description |
---|---|---|
x_id | string | The document ID. If omitted, one is auto-generated. Optional. |
x_index | string | The target index. Required only if not set globally in the BulkRequest . |
routing | string | A custom routing value used to control shard placement. Optional. |
if_primary_term | int64 | Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional. |
if_seq_no | int64 | Used for concurrency control. The operation only runs if the document’s sequence number matches this value. Optional. |
op_type | OpType | The operation type. Controls the overwriting behavior. Valid values are index (default) and create . Optional. |
version | int64 | The explicit document version for concurrency control. Optional. |
version_type | VersionType | Controls version matching behavior. Optional. |
pipeline | string | The preprocessing ingest pipeline ID. Optional. |
require_alias | bool | If true , requires that all actions target an index alias rather than an index. Default is false . Optional. |
Example request
The following example shows a bulk request with an index
operation. It indexes a Base64-encoded document with the ID tt0468569
into the movies
index:
{
"index": "movies",
"request_body": [
{
"operation_container": {
"index": {
"x_index": "movies",
"x_id": "tt0468569"
}
},
"object": "eyJ0aXRsZSI6ICJUaGUgRGFyayBLbmlnaHQiLCAieWVhciI6IDIwMDh9"
}
]
}
Update
The UpdateOperation
performs partial document updates.
The update options are provided in the update_action
field within the BulkRequestBody
message.
All UpdateOperation
fields, listed in the following table, are optional except for x_id
.
Field | Protobuf type | Description |
---|---|---|
x_id | string | The ID of the document to update. Required. |
x_index | string | The target index. Required if not set globally in the BulkRequest . Optional. |
routing | string | A custom routing value used to control shard placement. Optional. |
if_primary_term | int64 | Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional. |
if_seq_no | int64 | Used for concurrency control. The operation only runs if the document’s sequence number matches this value. Optional. |
require_alias | bool | If true , requires that all actions target an index alias rather than an index. Default is false . Optional. |
retry_on_conflict | int32 | The number of times to retry the operation if a version conflict occurs. Optional. |
Example request
The following example shows a bulk request with an update
operation. It will update a document with the ID tt1375666
in the movies
index to {"year": 2011}
:
{
"index": "movies",
"request_body": [
{
"operation_container": {
"update": {
"x_index": "movies",
"x_id": "tt1375666"
}
},
"update_action": {
"doc": "eyJ5ZWFyIjogMjAxMX0=",
"detect_noop": true
}
}
]
}
Upsert
The upsert
operation updates the document if it already exists. Otherwise, it creates a new document using the provided document content.
To upsert a document, provide an UpdateOperation
and specify doc_as_upsert
as true
in the BulkRequestBody
. The document to be upserted should be provided in the doc
field.
Example request
The following example shows a bulk request with an upsert
operation. It updates the year
field of the document with ID tt1375666
in the movies
index to {"year": 2012}
:
{
"index": "movies",
"request_body": [
{
"update": {
"index": "movies",
"id": "tt1375666"
},
"doc": "eyJ5ZWFyIjogMjAxMn0=",
"doc_as_upsert": true
}
]
}
Script
Run a stored or inline script to modify a document.
To specify a script, provide an UpdateOperation
and a script
field in the BulkRequestBody
.
Example request
The following example shows a bulk request with a script
operation. It increments the year
field of the document with the ID tt1375666
in the movies
index by 1:
{
"index": "movies",
"request_body": [
{
"update": {
"index": "movies",
"id": "tt1375666"
},
"script": {
"inline_script": {
"source": "ctx._source.year += 1",
"lang": "SCRIPT_LANGUAGE_PAINLESS"
}
}
}
]
}
Response fields
The gRPC Bulk API provides the following response fields.
BulkResponse fields
The BulkResponse
message is returned directly from the Bulk
gRPC method and provides a summary and per-item result of a bulk operation. It contains the following fields.
Field | Protobuf type | Description |
---|---|---|
errors | bool | Indicates whether any of the operations in the bulk request failed. If any operation fails, the response’s errors field will be true . You can iterate over the individual Item actions for more detailed information. |
items | repeated Item | The result of all operations in the bulk request, in the order they were submitted. |
took | int64 | The amount of time taken to process the bulk request, in milliseconds. |
ingest_took | int64 | The amount of time taken to process documents through an ingest pipeline, in milliseconds. |
Item fields
Each Item
in the response corresponds to a single operation in the request. For each operation, only one of the following fields is provided.
Field | Protobuf type | Description |
---|---|---|
create | ResponseItem | The result of the CreateOperation . |
delete | ResponseItem | The result of the DeleteOperation . |
index | ResponseItem | The result of the IndexOperation . |
update | ResponseItem | The result of the UpdateOperation . |
ResponseItem fields
Each ResponseItem
corresponds to a single operation in the request. It contains the following fields.
Field | Protobuf type | Description |
---|---|---|
type | string | The document type. |
id | ResponseItem.Id | The document ID associated with the operation. Can be null . |
index | string | The name of the index associated with the operation. If a data stream was targeted, this is the backing index. |
status | int32 | The HTTP status code returned for the operation. (Note: This field may be replaced with a gRPC code in the future.) |
error | ErrorCause | Contains additional information about a failed operation. |
primary_term | int64 | The primary term assigned to the document. |
result | string | The operation result. Valid values are created , deleted , and updated . |
seq_no | int64 | A sequence number assigned to the document to maintain version order. |
shards | ShardInfo | Shard information for the operation (only returned for successful actions). |
version | int64 | The document version (only returned for successful actions). |
forced_refresh | bool | If true , forces the document to become visible immediately after the operation. |
get | InlineGetDictUserDefined | Contains the document source returned from an inline get, if requested. |
Example response
{
"bulk_response_body": {
"errors": false,
"items": [
{
"index": {
"id": {
"string": "2"
},
"index": "my_index",
"status": 201,
"primary_term": 1,
"result": "created",
"seq_no": 0,
"shards": {
"successful": 1,
"total": 2
},
"version": 1,
"forced_refresh": true
}
},
{
"create": {
"id": {
"string": "1"
},
"index": "my_index",
"status": 201,
"primary_term": 1,
"result": "created",
"seq_no": 0,
"shards": {
"successful": 1,
"total": 2
},
"version": 1,
"forced_refresh": true
}
},
{
"update": {
"id": {
"string": "2"
},
"index": "my_index",
"status": 200,
"primary_term": 1,
"result": "updated",
"seq_no": 1,
"shards": {
"successful": 1,
"total": 2
},
"version": 2,
"forced_refresh": true,
"get": {
"found": true,
"seq_no": 1,
"primary_term": 1,
"source": "e30="
}
}
},
{
"delete": {
"id": {
"string": "2"
},
"index": "my_index",
"status": 200,
"primary_term": 1,
"result": "deleted",
"seq_no": 2,
"shards": {
"successful": 1,
"total": 2
},
"version": 3,
"forced_refresh": true
}
}
],
"took": 87,
"ingest_took": 0
}
}
Java gRPC client example
The following example shows a Java client-side program that submits an example bulk gRPC request and then checks whether there were any errors in the bulk response:
import org.opensearch.protobufs.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.google.protobuf.ByteString;
public class BulkClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9400)
.usePlaintext()
.build();
DocumentServiceGrpc.DocumentServiceBlockingStub stub = DocumentServiceGrpc.newBlockingStub(channel);
// Create an index operation
IndexOperation indexOp = IndexOperation.newBuilder()
.setXIndex("my-index")
.setXId("1")
.build();
BulkRequestBody indexBody = BulkRequestBody.newBuilder()
.setOperationContainer(OperationContainer.newBuilder().setIndex(indexOp).build())
.setObject(ByteString.copyFromUtf8("{\"field\": \"value\"}"))
.build();
// Create a delete operation
DeleteOperation deleteOp = DeleteOperation.newBuilder()
.setXIndex("my-index")
.setXId("2")
.build();
BulkRequestBody deleteBody = BulkRequestBody.newBuilder()
.setOperationContainer(OperationContainer.newBuilder().setDelete(deleteOp).build())
.build();
// Build the bulk request
BulkRequest request = BulkRequest.newBuilder()
.setIndex("my-index")
.addRequestBody(indexBody)
.addRequestBody(deleteBody)
.build();
// Execute the bulk request
try {
BulkResponse response = stub.bulk(request);
// Handle the response
System.out.println("Bulk errors: " + response.getErrors());
System.out.println("Bulk took: " + response.getTook() + " ms");
if (response.hasIngestTook()) {
System.out.println("Ingest took: " + response.getIngestTook() + " ms");
}
// Process individual items
for (Item item : response.getItemsList()) {
if (item.hasIndex()) {
System.out.println("Index operation: " + item.getIndex().getStatus());
} else if (item.hasDelete()) {
System.out.println("Delete operation: " + item.getDelete().getStatus());
} else if (item.hasCreate()) {
System.out.println("Create operation: " + item.getCreate().getStatus());
} else if (item.hasUpdate()) {
System.out.println("Update operation: " + item.getUpdate().getStatus());
}
}
} catch (io.grpc.StatusRuntimeException e) {
System.err.println("gRPC request failed with status: " + e.getStatus());
System.err.println("Error message: " + e.getMessage());
}
channel.shutdown();
}
}
Python gRPC client example
The following example shows how to send the same request using a Python client application.
First, install the opensearch-protobufs
package using pip
:
pip install opensearch-protobufs==0.19.0
Use the following code to send the request:
import grpc
from opensearch.protobufs.schemas import document_pb2
from opensearch.protobufs.schemas import common_pb2
from opensearch.protobufs.services.document_service_pb2_grpc import DocumentServiceStub
channel = grpc.insecure_channel(
target="localhost:9400",
)
document_stub = DocumentServiceStub(channel)
# Add documents to a request body
requestBody = document_pb2.BulkRequestBody(
operation_container=document_pb2.OperationContainer(index=document_pb2.IndexOperation())
)
requestBody.object = "{\"field\": \"value\"}".encode('utf-8')
# Append to a bulk request
request = document_pb2.BulkRequest()
request.index = "my-index"
request.request_body.append(requestBody)
# Send request and handle response
try:
response = document_stub.Bulk(request=request)
if response.items:
print("Received {} response items".format(len(response.items)))
print(response.items)
except grpc.RpcError as e:
if e.code() == StatusCode.UNAVAILABLE:
print("Failed to reach server: {}".format(e))
elif e.code() == StatusCode.PERMISSION_DENIED:
print("Permission denied: {}".format(e))
elif e.code() == StatusCode.INVALID_ARGUMENT:
print("Invalid argument: {}".format(e))
finally:
channel.close()