Kafka source
You can use the Apache Kafka source (kafka) in OpenSearch Data Prepper to read records from one or more Kafka topics. These records hold events that your Data Prepper pipeline can ingest. The kafka source uses Kafka’s Consumer API to consume messages from the Kafka broker, which then creates Data Prepper events for further processing by the Data Prepper pipeline.
Usage
The following example shows the kafka source in a Data Prepper pipeline:
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- 127.0.0.1:9092
topics:
- name: Topic1
group_id: groupID1
- name: Topic2
group_id: groupID1
sink:
- stdout: {}
Configuration
Use the following configuration options with the kafka source.
| Option | Required | Type | Description |
|---|---|---|---|
bootstrap_servers | Yes, when not using Amazon Managed Streaming for Apache Kafka (Amazon MSK) as a cluster. | IP address | The host or port for the initial connection to the Kafka cluster. You can configure multiple Kafka brokers by using the IP address or port number for each broker. When using Amazon MSK as your Kafka cluster, the bootstrap server information is obtained from MSK using the MSK Amazon Resource Name (ARN) provided in the configuration. |
topics | Yes | JSON array | The Kafka topics that the Data Prepper kafka source uses to read messages. You can configure up to 10 topics. For more information about topics configuration options, see Topics. |
schema | No | JSON object | The schema registry configuration. For more information, see Schema. |
authentication | No | JSON object | Set the authentication options for both the pipeline and Kafka. For more information, see Authentication. |
encryption | No | JSON object | The encryption configuration. For more information, see Encryption. |
aws | No | JSON object | The AWS configuration. For more information, see aws. |
acknowledgments | No | Boolean | If true, enables the kafka source to receive end-to-end acknowledgments when events are received by OpenSearch sinks. Default is false. |
client_dns_lookup | Yes, when a DNS alias is used. | String | Sets Kafka’s client.dns.lookup option. Default is default. |
Topics
Use the following options in the topics array for each topic.
| Option | Required | Type | Description |
|---|---|---|---|
name | Yes | String | The name of each Kafka topic. |
group_id | Yes | String | Sets Kafka’s group.id option. |
workers | No | Integer | The number of multithreaded consumers associated with each topic. Default is 2. The maximum value is 200. |
serde_format | No | String | Indicates the serialization and deserialization format of the messages in the topic. Default is plaintext. |
auto_commit | No | Boolean | When false, the consumer’s offset will not be periodically committed to Kafka in the background. Default is false. |
commit_interval | No | Integer | When auto_commit is set to true, sets how frequently, in seconds, the consumer offsets are auto-committed to Kafka through Kafka’s auto.commit.interval.ms option. Default is 5s. |
session_timeout | No | Integer | The amount of time during which the source detects client failures when using Kafka’s group management features, which can be used to balance the data stream. Default is 45s. |
auto_offset_reset | No | String | Automatically resets the offset to an earlier or the latest offset through Kafka’s auto.offset.reset option. Default is earliest. |
thread_waiting_time | No | Integer | The amount of time that threads wait for the preceding thread to complete its task and to signal the next thread. The Kafka consumer API poll timeout value is set to half of this setting. Default is 5s. |
max_partition_fetch_bytes | No | Integer | Sets the maximum limit in megabytes for max data returns from each partition through Kafka’s max.partition.fetch.bytes setting. Default is 1mb. |
heart_beat_interval | No | Integer | The expected amount of time between heartbeats to the consumer coordinator when using Kafka’s group management facilities through Kafka’s heartbeat.interval.ms setting. Default is 5s. |
fetch_max_wait | No | Integer | The maximum amount of time during which the server blocks a fetch request when there isn’t sufficient data to satisfy the fetch_min_bytes requirement through Kafka’s fetch.max.wait.ms setting. Default is 500ms. |
fetch_max_bytes | No | Integer | The maximum record size accepted by the broker through Kafka’s fetch.max.bytes setting. Default is 50mb. |
fetch_min_bytes | No | Integer | The minimum amount of data the server returns during a fetch request through Kafka’s retry.backoff.ms setting. Default is 1b. |
retry_backoff | No | Integer | The amount of time to wait before attempting to retry a failed request to a given topic partition. Default is 10s. |
max_poll_interval | No | Integer | The maximum delay between invocations of a poll() when using group management through Kafka’s max.poll.interval.ms option. Default is 300s. |
consumer_max_poll_records | No | Integer | The maximum number of records returned in a single poll() call through Kafka’s max.poll.records setting. Default is 500. |
key_mode | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is include_as_field, which includes the key in the kafka_key event. The include_as_metadata setting includes the key in the event’s metadata. The discard setting discards the key. |
Schema
The schema configuration has the following options.
| Option | Type | Required | Description |
|---|---|---|---|
type | String | Yes | Sets the type of schema based on your registry. Valid values are aws_glue (AWS Glue schema registry) and confluent (Confluent schema registry) . When using the aws_glue registry, set any AWS configuration options. |
basic_auth_credentials_source | String | No | Where schema registry credentials come from. Use USER_INFO when providing api_key/api_secret. Other valid values are URL and SASL_INHERIT. Default typically aligns with the underlying client. |
The following configuration options are only required when using a confluent registry.
| Option | Type | Description |
|---|---|---|
registry_url | String | The base URL of the schema registry (for example, http://schema-registry:8081 or https://sr.example.com). |
version | String | The schema version to use per subject. Use an integer or latest. |
api_key | String | The schema registry API key. |
api_secret | String | The schema registry API secret. |
The following example configures a schema registry:
schema:
type: confluent
registry_url: "http://schema-registry:8081"
api_key: "<optional if using basic/key auth>"
api_secret: "<optional if using basic/key auth>"
version: "latest"
Schema registry over TLS
The Kafka source uses the JVM truststore when connecting to the schema registry over https. If the schema registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore using environment variables.
You can use the following command to build a truststore with your CA certificate:
keytool -importcert -noprompt -alias sr-ca -file sr-ca.pem -keystore /usr/share/data-prepper/certs/sr.truststore.jks -storepass changeit
The following command configures Data Prepper using JAVA_TOOL_OPTIONS:
JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit
You can configure Data Pepper in docker-compose.yaml using the following method:
environment:
- JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit
volumes:
- ./certs:/usr/share/data-prepper/certs:ro
Authentication
The authentication section configures SASL:
authentication:
sasl:
plaintext:
username: alice
password: secret
| Option | Type | Description |
|---|---|---|
sasl | Object | SASL configuration. |
SASL
Use one of the following options when configuring SASL authentication.
| Option | Type | Description |
|---|---|---|
plaintext | JSON object | The plaintext authentication configuration. The alias plain is also supported for backward compatibility. For more information, see SASL plaintext. |
aws_msk_iam | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to role, the sts_role_arm set in the aws configuration is used. Default is default. |
SASL plaintext
The following options are required when using the SASL.plain protocol.
| Option | Type | Description |
|---|---|---|
username | String | The SASL/PLAIN username. |
password | String | The SASL/PLAIN password. |
Encryption
Use the following options when setting SSL encryption.
| Option | Required | Type | Description |
|---|---|---|---|
type | No | String | The encryption type. Use none to disable encryption. Default is ssl. |
certificate | No | String | The SSL certificate content. Use either this option or trust_store_file_path, not both. |
trust_store_file_path | No | String | The path to the truststore file containing the SSL certificate. Use either this option or certificate, not both. |
trust_store_password | No | String | The password for the truststore file. |
insecure | No | Boolean | A Boolean flag used to turn off SSL certificate verification. If set to true, certificate authority (CA) certificate verification is turned off and insecure HTTP requests are sent. Default is false. |
Use the following configuration to enable SSL encryption:
encryption:
type: ssl
# With public CA: no extra config needed.
# With private CA: trust using JVM truststore.
AWS
Use the following options when setting up authentication for aws services.
| Option | Required | Type | Description |
|---|---|---|---|
region | No | String | The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region. |
sts_role_arn | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Storage Service (Amazon S3). Default is null, which will use the standard SDK behavior for credentials. |
msk | No | JSON object | The MSK configuration settings. |
MSK
Use the following options inside the msk object.
| Option | Required | Type | Description |
|---|---|---|---|
arn | Yes | String | The MSK ARN to use. |
broker_connection_type | No | String | The type of connector to use with the MSK broker. Valid values are public, single_vpc, and multip_vpc. Default is single_vpc. |
Configuration examples
This section demonstrates different pipeline configuration options.
Basic Kafka source
The following example pipeline reads JSON messages from a single plaintext Kafka topic with multiple consumer workers, parses them, and indexes them into OpenSearch:
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- localhost:9092
topics:
- name: my-topic
group_id: data-prepper-group
workers: 4
processor:
- parse_json:
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index: kafka-data
Kafka source with SSL encryption
The following example pipeline connects to a Kafka broker over TLS, consumes messages from a secure topic, and writes the results to OpenSearch:
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- kafka-broker.example.com:9093
topics:
- name: secure-topic
group_id: secure-group
encryption:
type: ssl
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index: secure-kafka-data
Kafka source with SASL PLAIN authentication
The following example pipeline authenticates to Kafka using the SASL/PLAIN protocol over TLS, consumes messages from the topic, and indexes them into OpenSearch:
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- kafka-broker.example.com:9094
topics:
- name: authenticated-topic
group_id: auth-group
encryption:
type: ssl
authentication:
sasl:
plaintext:
username: kafka-user
password: kafka-password
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index: authenticated-kafka-data
Amazon MSK with AWS Glue schema registry
The following example configures Amazon MSK with the AWS Glue schema registry, consumes messages from an MSK cluster using AWS settings, deserializes the payload using the AWS Glue schema registry, normalizes timestamps, and writes to an Amazon OpenSearch domain:
msk-pipeline:
source:
kafka:
acknowledgments: true
topics:
- name: my-msk-topic
group_id: msk-consumer-group
auto_offset_reset: earliest
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/data-prepper-role
msk:
arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster-name/uuid
schema:
type: aws_glue
registry_name: my-glue-registry
processor:
- date:
match:
- key: timestamp
patterns: ["epoch_milli"]
destination: "@timestamp"
sink:
- opensearch:
hosts: ["https://search-my-domain.us-east-1.opensearch.amazonaws.com"]
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/opensearch-role
index: msk-data
index_type: custom
Confluent Kafka with schema registry
The following example configures Confluent Kafka with the schema registry, connects to Confluent Cloud over TLS using SASL and Confluent schema registry credentials, decodes payloads, and indexes them into OpenSearch:
confluent-pipeline:
source:
kafka:
bootstrap_servers:
- pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
topics:
- name: confluent-topic
group_id: confluent-group
auto_offset_reset: earliest
encryption:
type: ssl
authentication:
sasl:
plaintext:
username: confluent-api-key
password: confluent-api-secret
schema:
type: confluent
registry_url: https://psrc-xxxxx.us-east-1.aws.confluent.cloud
api_key: "$"
api_secret: "$"
basic_auth_credentials_source: USER_INFO
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index_type: custom
index: confluent-data