s3 source
s3 is a source plugin that reads events from Amazon Simple Storage Service (Amazon S3) objects. You can configure the source to either use an Amazon Simple Queue Service (Amazon SQS) queue or scan an S3 bucket:
- To use Amazon SQS notifications, configure S3 event notifications on your S3 bucket. After Amazon SQS is configured, the s3source receives messages from Amazon SQS. When the SQS message indicates that an S3 object has been created, thes3source loads the S3 objects and then parses them using the configured codec.
- To use an S3 bucket, configure the s3source to use Amazon S3 Select instead of Data Prepper to parse S3 objects.
IAM permissions
In order to use the s3 source, configure your AWS Identity and Access Management (IAM) permissions to grant Data Prepper access to Amazon S3. You can use a configuration similar to the following JSON configuration:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3-access",
            "Effect": "Allow",
            "Action": [
              "s3:GetObject",
              "s3:ListBucket",
              "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
        },
        {
            "Sid": "sqs-access",
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage"
            ],
            "Resource": "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
        },
        {
            "Sid": "kms-access",
            "Effect": "Allow",
            "Action": "kms:Decrypt",
            "Resource": "arn:aws:kms:<YOUR-REGION>:<123456789012>:key/<YOUR-KMS-KEY>"
        }
    ]
}
If your S3 objects or Amazon SQS queues do not use AWS Key Management Service (AWS KMS), remove the kms:Decrypt permission.
If you do not enable visibility_duplication_protection, you can remove the sqs:ChangeMessageVisibility permission from the SQS queue’s access.
Cross-account S3 access
When Data Prepper fetches data from an S3 bucket, it verifies the ownership of the bucket using the bucket owner condition. By default, Data Prepper expects an S3 bucket to be owned by the same that owns the correlating SQS queue. When no SQS is provided, Data Prepper uses the Amazon Resource Name (ARN) role in the aws configuration.
If you plan to ingest data from multiple S3 buckets but each bucket is associated with a different S3 account, you need to configure Data Prepper to check for cross-account S3 access, according to the following conditions:
- If all S3 buckets you want data from belong to an account other than that of the SQS queue, set default_bucket_ownerto the account ID of the bucket account holder.
- If your S3 buckets are in multiple accounts, use a bucket_ownersmap.
In the following example, the SQS queue is owned by account 000000000000. The SQS queue contains data from two S3 buckets: my-bucket-01 and my-bucket-02. Because my-bucket-01 is owned by 123456789012 and my-bucket-02 is owned by 999999999999, the bucket_owners map calls both bucket owners with their account IDs, as shown in the following configuration:
s3:
  sqs:
      queue_url: "https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue"
  bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999
You can use both bucket_owners and default_bucket_owner together.
Configuration
You can use the following options to configure the s3 source.
| Option | Required | Type | Description | 
|---|---|---|---|
| notification_type | Yes | String | Must be sqs. | 
| notification_source | No | String | Determines how notifications are received by SQS. Must be s3oreventbridge.s3represents notifications that are directly sent from Amazon S3 to Amazon SQS or fanout notifications from Amazon S3 to Amazon Simple Notification Service (Amazon SNS) to Amazon SQS.eventbridgerepresents notifications from Amazon EventBridge and Amazon Security Lake. Default iss3. | 
| compression | No | String | The compression algorithm to apply: none,gzip,snappy, orautomatic. Default isnone. | 
| codec | Yes | Codec | The codec to apply. | 
| sqs | Yes | SQS | The SQS configuration. See sqs for more information. | 
| aws | Yes | AWS | The AWS configuration. See aws for more information. | 
| on_error | No | String | Determines how to handle errors in Amazon SQS. Can be either retain_messagesordelete_messages.retain_messagesleaves the message in the Amazon SQS queue and tries to send the message again. This is recommended for dead-letter queues.delete_messagesdeletes failed messages. Default isretain_messages. | 
| buffer_timeout | No | Duration | The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the Amazon S3 source cannot write to the buffer during the specified amount of time are discarded. Default is 10s. | 
| records_to_accumulate | No | Integer | The number of messages that accumulate before being written to the buffer. Default is 100. | 
| metadata_root_key | No | String | The base key for adding S3 metadata to each event. The metadata includes the key and bucket for each S3 object. Default is s3/. | 
| default_bucket_owner | No | String | The AWS account ID for the owner of an S3 bucket. For more information, see Cross-account S3 access. | 
| bucket_owners | No | Map | A map of bucket names that includes the IDs of the accounts that own the buckets. For more information, see Cross-account S3 access. | 
| disable_bucket_ownership_validation | No | Boolean | When true, the S3 source does not attempt to validate that the bucket is owned by the expected account. The expected account is the same account that owns the Amazon SQS queue. Default isfalse. | 
| acknowledgments | No | Boolean | When true, enabless3sources to receive end-to-end acknowledgments when events are received by OpenSearch sinks. | 
| s3_select | No | s3_select | The Amazon S3 Select configuration. | 
| scan | No | scan | The S3 scan configuration. | 
| delete_s3_objects_on_read | No | Boolean | When true, the S3 scan attempts to delete S3 objects after all events from the S3 object are successfully acknowledged by all sinks.acknowledgmentsshould be enabled when deleting S3 objects. Default isfalse. | 
| workers | No | Integer | Configures the number of worker threads that the source uses to read data from S3. Leave this value as the default unless your S3 objects are less than 1 MB in size. Performance may decrease for larger S3 objects. This setting affects SQS-based sources and S3-Scan sources. Default is 1. | 
sqs
The following parameters allow you to configure usage for Amazon SQS in the s3 source plugin.
| Option | Required | Type | Description | 
|---|---|---|---|
| queue_url | Yes | String | The URL of the Amazon SQS queue from which messages are received. | 
| maximum_messages | No | Integer | The maximum number of messages to receive from the Amazon SQS queue in any single request. Default is 10. | 
| visibility_timeout | No | Duration | The visibility timeout to apply to messages read from the Amazon SQS queue. This should be set to the amount of time that Data Prepper may take to read all the S3 objects in a batch. Default is 30s. | 
| wait_time | No | Duration | The amount of time to wait for long polling on the Amazon SQS API. Default is 20s. | 
| poll_delay | No | Duration | A delay placed between the reading and processing of a batch of Amazon SQS messages and making a subsequent request. Default is 0s. | 
| visibility_duplication_protection | No | Boolean | If set to true, Data Prepper attempts to avoid duplicate processing by extending the visibility timeout of SQS messages. Until the data reaches the sink, Data Prepper will regularly callChangeMessageVisibilityto avoid rereading of the S3 object. To use this feature, you need to grant permissions tosqs:ChangeMessageVisibilityon the IAM role. Default isfalse. | 
| visibility_duplicate_protection_timeout | No | Duration | Sets the maximum total length of time that a message will not be processed when using visibility_duplication_protection. Defaults to two hours. | 
aws
| Option | Required | Type | Description | 
|---|---|---|---|
| region | No | String | The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region. | 
| sts_role_arn | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to null, which will use the standard SDK behavior for credentials. | 
| aws_sts_header_overrides | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. | 
| sts_external_id | No | String | An STS external ID used when Data Prepper assumes the STS role. For more information, see the ExternalIDdocumentation in the STS AssumeRole API reference. | 
codec
The codec determines how the s3 source parses each Amazon S3 object. For increased and more efficient performance, you can use codec combinations with certain processors.
newline codec
 The newline codec parses each single line as a single log event. This is ideal for most application logs because each event parses per single line. It can also be suitable for S3 objects that have individual JSON objects on each line, which matches well when used with the parse_json processor to parse each line.
Use the following options to configure the newline codec.
| Option | Required | Type | Description | 
|---|---|---|---|
| skip_lines | No | Integer | The number of lines to skip before creating events. You can use this configuration to skip common header rows. Default is 0. | 
| header_destination | No | String | A key value to assign to the header line of the S3 object. If this option is specified, then each event will contain a header_destinationfield. | 
json codec
The json codec parses each S3 object as a single JSON object from a JSON array and then creates a Data Prepper log event for each object in the array.
csv codec
The csv codec parses objects in comma-separated value (CSV) format, with each row producing a Data Prepper log event. Use the following options to configure the csv codec.
| Option | Required | Type | Description | 
|---|---|---|---|
| delimiter | Yes | Integer | The delimiter separating columns. Default is ,. | 
| quote_character | Yes | String | The character used as a text qualifier for CSV data. Default is ". | 
| header | No | String list | The header containing the column names used to parse CSV data. | 
| detect_header | No | Boolean | Whether the first line of the Amazon S3 object should be interpreted as a header. Default is true. | 
Using s3_select with the s3 source
 When configuring s3_select to parse Amazon S3 objects, use the following options:
| Option | Required | Type | Description | 
|---|---|---|---|
| expression | Yes, when using s3_select | String | The expression used to query the object. Maps directly to the expression property. | 
| expression_type | No | String | The type of the provided expression. Default value is SQL. Maps directly to the ExpressionType. | 
| input_serialization | Yes, when using s3_select | String | Provides the S3 Select file format. Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. May be csv,json, orparquet. | 
| compression_type | No | String | Specifies an object’s compression format. Maps directly to the CompressionType. | 
| csv | No | csv | Provides the CSV configuration for processing CSV data. | 
| json | No | json | Provides the JSON configuration for processing JSON data. | 
csv
Use the following options in conjunction with the csv configuration for s3_select to determine how your parsed CSV file should be formatted.
These options map directly to options available in the S3 Select CSVInput data type.
| Option | Required | Type | Description | 
|---|---|---|---|
| file_header_info | No | String | Describes the first line of input. Maps directly to the FileHeaderInfo property. | 
| quote_escape | No | String | A single character used for escaping the quotation mark character inside an already escaped value. Maps directly to the QuoteEscapeCharacter property. | 
| comments | No | String | A single character used to indicate that a row should be ignored when the character is present at the start of that row. Maps directly to the Comments property. | 
json
Use the following option in conjunction with json for s3_select to determine how S3 Select processes the JSON file.
| Option | Required | Type | Description | 
|---|---|---|---|
| type | No | String | The type of JSON array. May be either DOCUMENTorLINES. Maps directly to the Type property. | 
Using scan with the s3 source
 The following parameters allow you to scan S3 objects. All options can be configured at the bucket level.
| Option | Required | Type | Description | 
|---|---|---|---|
| start_time | No | String | The time from which to start scanning objects modified after the given start_time. This should follow ISO LocalDateTime format, for example,023-01-23T10:00:00. Ifend_timeis configured along withstart_time, all objects afterstart_timeand beforeend_timewill be processed.start_timeandrangecannot be used together. | 
| end_time | No | String | The time after which no objects will be scanned after the given end_time. This should follow ISO LocalDateTime format, for example,023-01-23T10:00:00. Ifstart_timeis configured along withend_time, all objects afterstart_timeand beforeend_timewill be processed.end_timeandrangecannot be used together. | 
| range | No | String | The time range from which objects are scanned from all buckets. Supports ISO_8601 notation strings, such as PT20.345SorPT15M, and notation strings for seconds (60s) and milliseconds (1600ms).start_timeandend_timecannot be used withrange. RangeP12Hscans all the objects modified in the last 12 hours from the time pipeline started. | 
| buckets | Yes | List | A list of scan buckets to scan. | 
| scheduling | No | List | The configuration for scheduling periodic scans on all buckets. start_time,end_timeandrangecan not be used if scheduling is configured. | 
scan bucket
| Option | Required | Type | Description | 
|---|---|---|---|
| bucket | Yes | Map | Provides options for each bucket. | 
You can configure the following options in the bucket setting map.
| Option | Required | Type | Description | 
|---|---|---|---|
| name | Yes | String | The string representing the S3 bucket name to scan. | 
| filter | No | Filter | Provides the filter configuration. | 
| start_time | No | String | The time from which to start scanning objects modified after the given start_time. This should follow ISO LocalDateTime format, for example,023-01-23T10:00:00. Ifend_timeis configured along withstart_time, all objects afterstart_timeand beforeend_timewill be processed.start_timeandrangecannot be used together. This will overwrites thestart_timeat the scan level. | 
| end_time | No | String | The time after which no objects will be scanned after the given end_time. This should follow ISO LocalDateTime format, for example,023-01-23T10:00:00. Ifstart_timeis configured along withend_time, all objects afterstart_timeand beforeend_timewill be processed. This overwrites theend_timeat the scan level. | 
| range | No | String | The time range from which objects are scanned from all buckets. Supports ISO_8601 notation strings, such as PT20.345SorPT15M, and notation strings for seconds (60s) and milliseconds (1600ms).start_timeandend_timecannot be used withrange. RangeP12Hscans all the objects modified in the last 12 hours from the time pipeline started. This overwrites therangeat the scan level. | 
filter
Use the following options inside the filter configuration.
| Option | Required | Type | Description | 
|---|---|---|---|
| include_prefix | No | List | A list of S3 key prefix strings included in the scan. By default, all the objects in a bucket are included. | 
| exclude_suffix | No | List | A list of S3 key suffix strings excluded from the scan. By default, no objects in a bucket are excluded. | 
scheduling
| Option | Required | Type | Description | 
|---|---|---|---|
| interval | Yes | String | Indicates the minimum interval between each scan. The next scan in the interval will start after the interval duration from the last scan ends and when all the objects from the previous scan are processed. Supports ISO 8601 notation strings, such as PT20.345SorPT15M, and notation strings for seconds (60s) and milliseconds (1600ms). | 
| count | No | Integer | Specifies how many times a bucket will be scanned. Defaults to Integer.MAX_VALUE. | 
Metrics
The s3 source includes the following metrics:
Counters
- s3ObjectsFailed: The number of S3 objects that the- s3source failed to read.
- s3ObjectsNotFound: The number of S3 objects that the- s3source failed to read due to an S3 “Not Found” error. These are also counted toward- s3ObjectsFailed.
- s3ObjectsAccessDenied: The number of S3 objects that the- s3source failed to read due to an “Access Denied” or “Forbidden” error. These are also counted toward- s3ObjectsFailed.
- s3ObjectsSucceeded: The number of S3 objects that the- s3source successfully read.
- s3ObjectNoRecordsFound: The number of S3 objects that resulted in 0 records being added to the buffer by the- s3source.
- s3ObjectsDeleted: The number of S3 objects deleted by the- s3source.
- s3ObjectsDeleteFailed: The number of S3 objects that the- s3source failed to delete.
- s3ObjectsEmpty: The number of S3 objects that are considered empty because they have a size of- 0. These objects will be skipped by the- s3source.
- sqsMessagesReceived: The number of Amazon SQS messages received from the queue by the- s3source.
- sqsMessagesDeleted: The number of Amazon SQS messages deleted from the queue by the- s3source.
- sqsMessagesFailed: The number of Amazon SQS messages that the- s3source failed to parse.
- sqsMessagesDeleteFailed– The number of SQS messages that the- s3source failed to delete from the SQS queue.
- sqsVisibilityTimeoutChangedCount: The number of times that the- s3source changed the visibility timeout for an SQS message. This includes multiple visibility timeout changes on the same message.
- sqsVisibilityTimeoutChangeFailedCount: The number of times that the- s3source failed to change the visibility timeout for an SQS message. This includes multiple visibility timeout change failures on the same message.
- acknowledgementSetCallbackCounter: The number of times that the- s3source received an acknowledgment from Data Prepper.
Timers
- s3ObjectReadTimeElapsed: Measures the amount of time the- s3source takes to perform a request to GET an S3 object, parse it, and write events to the buffer.
- sqsMessageDelay: Measures the time elapsed from when S3 creates an object to when it is fully parsed.
Distribution summaries
- s3ObjectSizeBytes: Measures the size of S3 objects as reported by the S3- Content-Length. For compressed objects, this is the compressed size.
- s3ObjectProcessedBytes: Measures the bytes processed by the- s3source for a given object. For compressed objects, this is the uncompressed size.
- s3ObjectsEvents: Measures the number of events (sometimes called records) produced by an S3 object.
Example: Uncompressed logs with sqs
The following pipeline.yaml file shows the minimum configuration for reading uncompressed newline-delimited logs:
source:
  s3:
    notification_type: sqs
    codec:
      newline:
    compression: none
    sqs:
      queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
    aws:
      region: "us-east-1"
      sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"
Example: Uncompressed logs with scan
The following pipeline.yaml file shows the minimum configuration for scanning objects with uncompressed newline-delimited logs:
source:
  s3:
    codec:
      newline:
    compression: none
    aws:
      region: "us-east-1"
      sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"
    scan:
      start_time: 2023-01-01T00:00:00
      range: "P365D"
      buckets:
        - bucket:
            name: "s3-scan-test"
            filter:
              exclude_suffix:
                - "*.log"