Link Search Menu Expand Document Documentation Menu

Bulk (gRPC)

Introduced 3.0

This is an experimental feature and is not recommended for use in a production environment. For updates on the progress of the feature or if you want to leave feedback, see the associated GitHub issue.

The gRPC Bulk API provides an efficient, binary-encoded alternative to the HTTP Bulk API for performing multiple document operations—such as indexing, updating, and deleting—in a single call. This service uses protocol buffers and mirrors the REST API in terms of parameters and structure.

Prerequisite

To submit gRPC requests, you must have a set of protobufs on the client side. For ways to obtain the protobufs, see Using gRPC APIs.

gRPC service and method

gRPC Document APIs reside in the DocumentService.

You can submit bulk requests by invoking the Bulk gRPC method within the DocumentService. The method takes in a BulkRequest and returns a BulkResponse.

Document format

In gRPC, documents must be provided and returned as bytes. Use Base64 encoding to provide documents in a gRPC request.

For example, consider the following document in a regular Bulk API request:

"doc":  "{\"title\": \"Inception\", \"year\": 2010}"

For a gRPC Bulk API request, provide the same document in Base64 encoding:

"doc": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"

BulkRequest fields

The BulkRequest message is the top-level container for a gRPC bulk operation. It accepts the following fields.

Field Protobuf type Description
request_body repeated BulkRequestBody The list of bulk operations (index/create/update/delete). Required.
index string The default index for all operations unless overridden in request_body. Specifying the index in the BulkRequest means that you don’t need to include it in the BulkRequestBody. Optional.
source SourceConfigParam Controls whether to return the full _source, no _source, or only specific fields from _source in the response. Optional.
source_excludes repeated string Fields to exclude from source. Optional.
source_includes repeated string Fields to include from source. Optional.
pipeline string The preprocessing ingest pipeline ID. Optional.
refresh Refresh Whether to refresh shards after indexing. Optional.
require_alias bool If true, actions must target an alias. Optional.
routing string The routing value for shard assignment. Optional.
timeout string The timeout duration (for example, 1m). Optional.
type (Deprecated) string The document type (always _doc). Optional.
wait_for_active_shards WaitForActiveShards The minimum number of active shards to wait for. Optional.

BulkRequestBody fields

The BulkRequestBody message represents a single document-level operation within a BulkRequest. It accepts the following fields. All fields are optional, but exactly one of index, create, update, or delete must be set in the BulkRequestBody.

Field Protobuf type Description
index IndexOperation Index a document. Replaces the document if it already exists. Optional.
create CreateOperation Create a new document. Fails if the document already exists. Optional.
update UpdateOperation Partially update a document or use upsert/script options. Optional.
delete DeleteOperation Delete a document by ID. Optional.
detect_noop bool If true, skips the update if the document content hasn’t changed. Optional. Default is true.
doc bytes Partial or full document data for update or index operations. Optional.
doc_as_upsert bool If true, treats the document as the full upsert document if the target document doesn’t exist. Only valid for the update operation. Optional.
script Script A script to apply to the document (used with update). Optional.
scripted_upsert bool If true, executes the script whether or not the document exists. Optional.
source SourceConfig Controls how the document source is fetched or filtered. Optional.
upsert bytes The full document to use if the target does not exist. Used with script. Optional.
object bytes The full document content used with create. Optional.

Create

CreateOperation adds a new document only if it doesn’t already exist.

The document itself must be provided in the object field, outside of the CreateOperation message.

The following optional fields can also be provided.

Field Protobuf type Description
id string The document ID. If omitted, one is auto-generated. Optional.
index string The target index. Required if not set globally in the BulkRequest. Optional.
routing string A custom routing value used to control shard placement. Optional.
if_primary_term int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
if_seq_no int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
version int64 The explicit document version for concurrency control. Optional.
version_type VersionType Controls version matching behavior. Optional.
pipeline string The preprocessing ingest pipeline ID. Optional.
require_alias bool Enforces the use of index aliases only. Optional.

Example request

The following example shows a bulk request with a create operation. It creates a document with the ID tt1375666 in the movies index. The document content, provided in Base64 encoding, represents {"title": "Inception", "year": 2010}:

{
  "index": "movies",
  "request_body": [
    {
      "create": {
        "index": "movies",
        "id": "tt1375666"
      },
      "object": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"
    }
  ]
}

Delete

The DeleteOperation removes a document by ID. It accepts the following fields.

Field Protobuf type Description
id string The ID of the document to delete. Required.
index string The target index. Required if not set globally in the BulkRequest. Optional.
routing string A custom routing value used to control shard placement. Optional.
if_primary_term int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
if_seq_no int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
version int64 The explicit document version for concurrency control. Optional.
version_type VersionType Controls version matching behavior. Optional.

Example request

The following example shows a bulk request with a delete operation. It deletes a document with the ID tt1392214 from the movies index:

{
  "index": "movies",
  "request_body": [
    {
      "delete": {
        "index": "movies",
        "id": "tt1392214"
      }
    }
  ]
}

Index

The IndexOperation creates or overwrites a document. If an ID is not provided, one is generated.

The document itself is provided in the doc field, outside of the IndexOperation message.

The following optional fields can also be provided.

Field Protobuf type Description
id string The document ID. If omitted, one is auto-generated. Optional.
index string The target index. Required only if not set globally in the BulkRequest.
routing string A custom routing value used to control shard placement. Optional.
if_primary_term int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
if_seq_no int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
op_type OpType The operation type. Controls the overwriting behavior. Valid values are index (default) and create. Optional.
version int64 The explicit document version for concurrency control. Optional.
version_type VersionType Controls version matching behavior. Optional.
pipeline string The preprocessing ingest pipeline ID. Optional.
require_alias bool Enforces the use of index aliases only. Optional.

Example request

The following example shows a bulk request with an index operation. It indexes a Base64-encoded document with the ID tt0468569 into the movies index:

{
  "index": "movies",
  "request_body": [
    {
      "index": {
        "index": "movies",
        "id": "tt0468569"
      },
      "doc": "eyJ0aXRsZSI6ICJUaGUgRGFyayBLbmlnaHQiLCAieWVhciI6IDIwMDh9"
    }
  ]
}

Update

The UpdateOperation performs partial document updates.

The document itself is provided in the doc field, outside of the UpdateOperation message.

All UpdateOperation fields, listed in the following table, are optional except for id.

Field Protobuf type Description
id string The ID of the document to update. Required.
index string The target index. Required if not set globally in the BulkRequest. Optional.
routing string A custom routing value used to control shard placement. Optional.
if_primary_term int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
if_seq_no int64 Used for concurrency control. The operation only runs if the document’s primary term matches this value. Optional.
require_alias bool Enforces the use of index aliases only. Optional.
retry_on_conflict int32 The number of times to retry the operation if a version conflict occurs. Optional.

Example request

The following example shows a bulk request with an update operation. It will update a document with the ID tt1375666 in the movies index to {"year": 2011}:

{
  "index": "movies",
  "request_body": [
    {
      "update": {
        "index": "movies",
        "id": "tt1375666"
      },
      "doc": "eyJ5ZWFyIjogMjAxMX0=",
      "detect_noop": true
    }
  ]
}

Upsert

The upsert operation updates the document if it already exists. Otherwise, it creates a new document using the provided document content.

To upsert a document, provide an UpdateOperation but specify doc_as_upsert as true. The document to be upserted should be provided in the doc_as_upsert field outside of the UpdateOperation.

Example request

The following example shows a bulk request with an upsert operation. It updates the year field of the document with ID tt1375666 in the movies index to {"year": 2012}:

{
  "index": "movies",
  "request_body": [
    {
      "update": {
        "index": "movies",
        "id": "tt1375666"
      },
      "doc": "eyJ5ZWFyIjogMjAxMn0=",
      "doc_as_upsert": true
    }
  ]
}

Script

Run a stored or inline script to modify a document.

To specify a script, provide an UpdateOperation and a script field outside of the UpdateOperation.

Example request

The following example shows a bulk request with a script operation. It increments the year field of the document with the ID tt1375666 in the movies index by 1:

{
  "index": "movies",
  "request_body": [
    {
      "update": {
        "index": "movies",
        "id": "tt1375666"
      },
      "script": {
        "source": "ctx._source.year += 1",
        "lang": "painless"
      }
    }
  ]
}

Response fields

The gRPC Bulk API provides the following response fields.

BulkResponseBody fields

The BulkResponse message wraps either a BulkResponseBody for successful requests, or a BulkErrorResponse for failed requests. The BulkResponseBody provides a summary and per-item result of a bulk operation and contains the following fields.

Field Protobuf type Description
errors bool Indicates whether any of the operations in the bulk request failed. If any operation fails, the response’s errors field will be true. You can iterate over the individual Item actions for more detailed information.
items repeated Item The result of all operations in the bulk request, in the order they were submitted.
took int64 The amount of time taken to process the bulk request, in milliseconds.
ingest_took int64 The amount of time taken to process documents through an ingest pipeline, in milliseconds.

Item fields

Each Item in the response corresponds to a single operation in the request. For each operation, only one of the following fields is provided.

Field Protobuf type Description
create ResponseItem The result of the CreateOperation.
delete ResponseItem The result of the DeleteOperation.
index ResponseItem The result of the IndexOperation.
update ResponseItem The result of the UpdateOperation.

ResponseItem fields

Each ResponseItem corresponds to a single operation in the request. It contains the following fields.

Field Protobuf type Description
type string The document type.
id ResponseItem.Id The document ID associated with the operation. Can be null.
index string The name of the index associated with the operation. If a data stream was targeted, this is the backing index.
status int32 The HTTP status code returned for the operation. (Note: This field may be replaced with a gRPC code in the future.)
error ErrorCause Contains additional information about a failed operation.
primary_term int64 The primary term assigned to the document.
result string The operation result. Valid values are created, deleted, and updated.
seq_no int64 A sequence number assigned to the document to maintain version order.
shards ShardInfo Shard information for the operation (only returned for successful actions).
version int64 The document version (only returned for successful actions).
forced_refresh bool If true, forces the document to become visible immediately after the operation.
get InlineGetDictUserDefined Contains the document source returned from an inline get, if requested.

Example response

{
  "bulkResponseBody": {
    "errors": false,
    "items": [
      {
        "index": {
          "id": {
            "string": "2"
          },
          "index": "my_index",
          "status": 201,
          "primaryTerm": "1",
          "result": "created",
          "seqNo": "0",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "1",
          "forcedRefresh": true
        }
      },
      {
        "create": {
          "id": {
            "string": "1"
          },
          "index": "my_index",
          "status": 201,
          "primaryTerm": "1",
          "result": "created",
          "seqNo": "0",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "1",
          "forcedRefresh": true
        }
      },
      {
        "update": {
          "id": {
            "string": "2"
          },
          "index": "my_index",
          "status": 200,
          "primaryTerm": "1",
          "result": "updated",
          "seqNo": "1",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "2",
          "forcedRefresh": true,
          "get": {
            "found": true,
            "seqNo": "1",
            "primaryTerm": "1",
            "source": "e30="
          }
        }
      },
      {
        "delete": {
          "id": {
            "string": "2"
          },
          "index": "my_index",
          "status": 200,
          "primaryTerm": "1",
          "result": "deleted",
          "seqNo": "2",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "3",
          "forcedRefresh": true
        }
      }
    ],
    "took": "87",
    "ingestTook": "0"
  }
}

Java gRPC client example

The following example shows a Java client-side program that submits a sample bulk gRPC request and then checks whether there were any errors in the bulk response:

import org.opensearch.protobufs.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.google.protobuf.ByteString;

public class BulkClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9400)
                .usePlaintext()
                .build();

        DocumentServiceGrpc.DocumentServiceBlockingStub stub = DocumentServiceGrpc.newBlockingStub(channel);

        IndexOperation indexOp = IndexOperation.newBuilder()
                .setIndex("my-index")
                .setId("1")
                .build();

        BulkRequestBody indexBody = BulkRequestBody.newBuilder()
                .setIndex(indexOp)
                .setDoc(ByteString.copyFromUtf8("{\"field\": \"value\"}"))
                .build();

        DeleteOperation deleteOp = DeleteOperation.newBuilder()
                .setIndex("my-index")
                .setId("2")
                .build();

        BulkRequestBody deleteBody = BulkRequestBody.newBuilder()
                .setDelete(deleteOp)
                .build();

        BulkRequest request = BulkRequest.newBuilder()
                .setIndex("my-index")
                .addRequestBody(indexBody)
                .addRequestBody(deleteBody)
                .build();

        BulkResponse response = stub.bulk(request);
        System.out.println("Bulk errors: " + response.getErrors());

        channel.shutdown();
    }
}