Hadoop connector
The OpenSearch Hadoop connector lets you read and write data between Apache Spark, Apache Hive, Hadoop MapReduce, and OpenSearch. It enables Spark jobs to directly index data into OpenSearch and run queries against it, with parallel reads and writes across Spark partitions and OpenSearch shards for efficient distributed processing.
For the source code, see the OpenSearch Hadoop repository.
Setup
Add the connector to your Spark application using --packages:
- For Spark 3.4.x, run the following command:
pyspark --packages org.opensearch.client:opensearch-spark-30_2.12:2.0.0
- For Spark 3.5.x, run the following command:
pyspark --packages org.opensearch.client:opensearch-spark-35_2.12:2.0.0
- For Spark 4.x, run the following command:
pyspark --packages org.opensearch.client:opensearch-spark-40_2.13:2.0.0
Alternatively, add Spark as a dependency in your build file:
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-spark-30_2.12</artifactId>
<version>2.0.0</version>
</dependency>
Choose the artifact that matches your Spark and Scala version listed in the following table.
| Spark version | Scala version | Artifact |
|---|---|---|
| 3.4.x | 2.12 | org.opensearch.client:opensearch-spark-30_2.12:2.0.0 |
| 3.4.x | 2.13 | org.opensearch.client:opensearch-spark-30_2.13:2.0.0 |
| 3.5.x | 2.12 | org.opensearch.client:opensearch-spark-35_2.12:2.0.0 |
| 3.5.x | 2.13 | org.opensearch.client:opensearch-spark-35_2.13:2.0.0 |
| 4.x | 2.13 | org.opensearch.client:opensearch-spark-40_2.13:2.0.0 |
Basic usage
The following examples demonstrate basic read and write operations using the connector with different Spark APIs.
PySpark
No additional Python package is needed. The Java connector is loaded using --packages or spark.jars:
# Write (index documents into OpenSearch)
df = spark.createDataFrame([("John", 30), ("Jane", 25)], ["name", "age"])
df.write.format("opensearch").save("people")
# Read (query documents from OpenSearch)
df = spark.read.format("opensearch").load("people")
df.show()
# Read with a query (only matching documents are transferred to Spark)
filtered = spark.read \
.format("opensearch") \
.option("opensearch.query", '{"query":{"match":{"name":"John"}}}') \
.load("people")
Scala
Use the Scala API to access helper methods like saveToOpenSearch for cleaner syntax:
import org.opensearch.spark.sql._
// Write (index documents into OpenSearch)
val df = spark.createDataFrame(Seq(("John", 30), ("Jane", 25))).toDF("name", "age")
df.saveToOpenSearch("people")
// Read (query documents from OpenSearch)
val result = spark.read.format("opensearch").load("people")
result.show()
// Read with a query
val filtered = spark.read
.format("opensearch")
.option("opensearch.query", """{"query":{"match":{"name":"John"}}}""")
.load("people")
Java
Use the JavaOpenSearchSparkSQL wrapper for Java applications:
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL;
// Write
Dataset<Row> df = spark.createDataFrame(data, schema);
JavaOpenSearchSparkSQL.saveToOpenSearch(df, "people");
// Read
Dataset<Row> result = spark.read().format("opensearch").load("people");
result.show();
Spark SQL
You can register an OpenSearch index as a temporary view and query it using SQL:
spark.sql("""
CREATE TEMPORARY VIEW people
USING opensearch
OPTIONS (resource 'people')
""")
spark.sql("SELECT * FROM people WHERE age > 25").show()
Write operations
Configure how documents are written to OpenSearch indexes, including document IDs, write modes, and routing strategies.
Specifying a document ID
Use opensearch.mapping.id to control the _id of each document:
df.write.format("opensearch") \
.option("opensearch.mapping.id", "id") \
.save("my-index")
Write modes
Control how data is written to OpenSearch using Spark’s write modes:
# Append (default): add documents to the index
df.write.format("opensearch").mode("append").save("my-index")
# Overwrite: delete the index and recreate it with the new data
df.write.format("opensearch").mode("overwrite").save("my-index")
Upsert
Update documents if they exist or insert them as new documents if they don’t. This operation requires specifying a document ID field:
df.write.format("opensearch") \
.option("opensearch.mapping.id", "id") \
.option("opensearch.write.operation", "upsert") \
.save("my-index")
Dynamic index routing
Use placeholders in the index name to route documents to different indexes based on field values. This feature requires the Scala saveToOpenSearch method:
import org.opensearch.spark.sql._
// Route by field value: {"category": "electronics", "name": "TV"} -> index "electronics"
df.saveToOpenSearch("{category}")
// Prefix + field value: {"env": "prod", "msg": "ok"} -> index "logs-prod"
df.saveToOpenSearch("logs-{env}")
// Date formatting: {"timestamp": "2026-02-16T10:30:00.000Z", "msg": "ok"} -> index "logs-2026.02.16"
df.saveToOpenSearch("logs-{timestamp|yyyy.MM.dd}")
Read operations
Optimize data retrieval from OpenSearch by filtering queries and selecting specific fields to reduce data transfer.
Reading with a query
Filter data at the OpenSearch level so only matching documents are loaded into Spark:
# Query DSL
df = spark.read.format("opensearch") \
.option("opensearch.query", '{"query":{"range":{"age":{"gte":25}}}}') \
.load("my-index")
# URI query
df = spark.read.format("opensearch") \
.option("opensearch.query", "?q=name:John") \
.load("my-index")
Selecting fields
Load only specific fields to reduce data transfer:
df = spark.read.format("opensearch") \
.option("opensearch.read.field.include", "name,age") \
.load("my-index")
Security
Secure connections to OpenSearch clusters using authentication and encryption.
Basic authentication
Provide credentials for OpenSearch clusters with authentication enabled:
df.write.format("opensearch") \
.option("opensearch.net.http.auth.user", "<username>") \
.option("opensearch.net.http.auth.pass", "<password>") \
.save("my-index")
HTTPS
Enable SSL/TLS encryption for secure connections:
df.write.format("opensearch") \
.option("opensearch.net.ssl", "true") \
.save("my-index")
Advanced Spark features
Access low-level Spark APIs and streaming capabilities for specialized use cases.
Spark resilient distributed dataset
For low-level access, the connector provides resilient distributed dataset (RDD)-based read and write methods:
import org.opensearch.spark._
// Write
val data = sc.makeRDD(Seq(
Map("name" -> "John", "age" -> 30),
Map("name" -> "Jane", "age" -> 25)
))
data.saveToOpenSearch("people")
// Read
val rdd = sc.opensearchRDD("people")
rdd.collect().foreach(println)
// Read with query
val filtered = sc.opensearchRDD("people", "?q=name:John")
Structured Streaming
The connector supports Spark Structured Streaming as a sink:
val query = streamingDF.writeStream
.format("opensearch")
.option("checkpointLocation", "/tmp/checkpoint")
.start("streaming-index")
Alternative interfaces
Use the connector with Hadoop MapReduce and Apache Hive for alternative workflows.
Hadoop MapReduce
For Hadoop MapReduce jobs, the connector provides OpenSearchInputFormat and OpenSearchOutputFormat. Add opensearch-hadoop-mr-2.0.0.jar to your job classpath.
// Writing
Configuration conf = new Configuration();
conf.set("opensearch.resource", "my-index");
Job job = new Job(conf);
job.setOutputFormatClass(OpenSearchOutputFormat.class);
job.waitForCompletion(true);
// Reading
Configuration conf = new Configuration();
conf.set("opensearch.resource", "my-index");
Job job = new Job(conf);
job.setInputFormatClass(OpenSearchInputFormat.class);
job.waitForCompletion(true);
Apache Hive
The connector provides an Apache Hive storage handler. Add opensearch-hadoop-hive-2.0.0.jar to your Hive classpath:
ADD JAR /path/opensearch-hadoop-hive-2.0.0.jar;
CREATE EXTERNAL TABLE people (
name STRING,
age INT)
STORED BY 'org.opensearch.hadoop.hive.OpenSearchStorageHandler'
TBLPROPERTIES('opensearch.resource' = 'people');
SELECT * FROM people;
Connecting to Amazon OpenSearch Service
To connect to Amazon OpenSearch Service with IAM authentication, enable AWS Signature Version 4 signing and HTTPS:
df.write.format("opensearch") \
.option("opensearch.nodes", "https://search-xxx.us-east-1.es.amazonaws.com") \
.option("opensearch.port", "443") \
.option("opensearch.net.ssl", "true") \
.option("opensearch.nodes.wan.only", "true") \
.option("opensearch.aws.sigv4.enabled", "true") \
.option("opensearch.aws.sigv4.region", "us-east-1") \
.save("my-index")
The following AWS SDK v2 dependencies are required on the classpath:
software.amazon.awssdk:auth:2.31.59(or later)software.amazon.awssdk:regions:2.31.59(or later)software.amazon.awssdk:http-client-spi:2.31.59(or later)software.amazon.awssdk:identity-spi:2.31.59(or later)software.amazon.awssdk:sdk-core:2.31.59(or later)software.amazon.awssdk:utils:2.31.59(or later)
Connecting to Amazon OpenSearch Serverless
To connect to Amazon OpenSearch Serverless, add opensearch.serverless and set the Signature Version 4 service name to aoss:
df.write.format("opensearch") \
.option("opensearch.nodes", "https://xxx.us-east-1.aoss.amazonaws.com") \
.option("opensearch.port", "443") \
.option("opensearch.net.ssl", "true") \
.option("opensearch.nodes.wan.only", "true") \
.option("opensearch.aws.sigv4.enabled", "true") \
.option("opensearch.aws.sigv4.region", "us-east-1") \
.option("opensearch.aws.sigv4.service.name", "aoss") \
.option("opensearch.serverless", "true") \
.save("my-index")
Configuration properties
All configuration properties start with the opensearch prefix. Properties can be set using Spark configuration (--conf), as options of the DataFrame reader/writer, or in the Hadoop configuration.
| Property | Default | Description |
|---|---|---|
opensearch.resource | (none) | The OpenSearch index name. Can also be specified as the argument to saveToOpenSearch() or load(). |
opensearch.nodes | localhost | The OpenSearch host address. |
opensearch.port | 9200 | The OpenSearch REST port. |
opensearch.nodes.wan.only | false | Set to true when connecting through a load balancer or proxy. |
opensearch.query | match all | A query DSL or Uniform Resource Identifier (URI) query for reading. |
opensearch.net.ssl | false | Enables HTTPS. |
opensearch.mapping.id | (none) | The document field to use as the _id. |
opensearch.write.operation | index | The write operation: index, create, update, or upsert. |
opensearch.scroll.size | 1000 | The number of documents fetched per batch when reading. |
opensearch.read.field.include | (none) | A comma-separated list of fields to read. |
Compatibility
The following table lists the connector versions and their compatible runtime versions.
| Client version | Minimum Java runtime version | OpenSearch version | Spark version |
|---|---|---|---|
| 1.0.0–1.3.0 | Java 8 | 1.x, 2.x | 3.4.x |
| 2.0.0 | Java 11 | 1.x, 2.x, 3.x | 3.4.x, 3.5.x, 4.x |