diff --git a/_data-prepper/pipelines/configuration/sources/s3.md b/_data-prepper/pipelines/configuration/sources/s3.md index 126b6d30..23e224ce 100644 --- a/_data-prepper/pipelines/configuration/sources/s3.md +++ b/_data-prepper/pipelines/configuration/sources/s3.md @@ -21,7 +21,11 @@ In order to use the `s3` source, configure your AWS Identity and Access Manageme { "Sid": "s3-access", "Effect": "Allow", - "Action": "s3:GetObject", + "Action": [ + "s3:GetObject", + "s3:ListBucket", + "s3:DeleteObject" + ], "Resource": "arn:aws:s3:::/*" }, { @@ -75,21 +79,23 @@ You can use both `bucket_owners` and `default_bucket_owner` together. You can use the following options to configure the `s3` source. -Option | Required | Type | Description -:--- | :--- |:---------| :--- -notification_type | Yes | String | Must be `sqs`. -compression | No | String | The compression algorithm to apply: `none`, `gzip`, or `automatic`. Default value is `none`. -codec | Yes | Codec | The [codec](#codec) to apply. -sqs | Yes | sqs | The SQS configuration. See [sqs](#sqs) for details. -aws | Yes | aws | The AWS configuration. See [aws](#aws) for details. -on_error | No | String | Determines how to handle errors in Amazon SQS, either `retain_messages` or `delete_messages`. `retain_messages` leaves the message in the Amazon SQS queue and tries to send the message again later. This is recommended for dead-letter queues. `delete_messages` deletes any failed messages. Default is `retain_messages`. -buffer_timeout | No | Duration | The amount of time allowed for for writing events to the Data Prepper buffer before timeout occurs. Any events that the Amazon S3 source cannot write to the buffer in this time will be discarded. Default value is 10 seconds. -records_to_accumulate | No | Integer | The number of messages that accumulate before writing to the buffer. Default value is `100`. -metadata_root_key | No | String | The base key for adding S3 metadata to each event. The metadata includes the key and bucket for each S3 object. Defaults to `s3/`. -default_bucket_owner | No | String | An AWS account ID to use as the default account when checking bucket ownership. -bucket_owners | No | Map | A map of S3 bucket names and their AWS account IDs. When provided, the `s3` source validates that the bucket is owned by the account. This allows for the use of buckets from multiple accounts. -disable_bucket_ownership_validation | No | Boolean | When `true`, the S3 source does not attempt to validate that the bucket is owned by the expected account. By default, this is the same account that owns the Amazon SQS queue. For more information, see [bucket ownership](#s3_bucket_ownership). Defaults to `false`. -acknowledgments | No | Boolean | When `true`, enables `s3` sources to receive [end-to-end acknowledgments]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/pipelines/#end-to-end-acknowledgments) when events are received by OpenSearch sinks. +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`notification_type` | Yes | String | Must be `sqs`. +`notification_source` | No | String | Determines how notifications are received by SQS. Must be `s3` or `eventbridge`. `s3` represents notifications that are directly sent from Amazon S3 to Amazon SQS or fanout notifications from Amazon S3 to Amazon Simple Notification Service (Amazon SNS) to Amazon SQS. `eventbridge` represents notifications from [Amazon EventBridge](https://aws.amazon.com/eventbridge/) and [Amazon Security Lake](https://aws.amazon.com/security-lake/). Default is `s3`. +`compression` | No | String | The compression algorithm to apply: `none`, `gzip`, or `automatic`. Default is `none`. +`codec` | Yes | Codec | The [codec](#codec) to apply. +`sqs` | Yes | SQS | The SQS configuration. See [sqs](#sqs) for more information. +`aws` | Yes | AWS | The AWS configuration. See [aws](#aws) for more information. +`on_error` | No | String | Determines how to handle errors in Amazon SQS. Can be either `retain_messages` or `delete_messages`. `retain_messages` leaves the message in the Amazon SQS queue and tries to send the message again. This is recommended for dead-letter queues. `delete_messages` deletes failed messages. Default is `retain_messages`. +buffer_timeout | No | Duration | The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the Amazon S3 source cannot write to the buffer during the set amount of time are discarded. Default is `10s`. +`records_to_accumulate` | No | Integer | The number of messages that accumulate before being written to the buffer. Default is `100`. +`metadata_root_key` | No | String | The base key for adding S3 metadata to each event. The metadata includes the key and bucket for each S3 object. Default is `s3/`. +`disable_bucket_ownership_validation` | No | Boolean | When `true`, the S3 source does not attempt to validate that the bucket is owned by the expected account. The expected account is the same account that owns the Amazon SQS queue. Default is `false`. +`acknowledgments` | No | Boolean | When `true`, enables `s3` sources to receive [end-to-end acknowledgments]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/pipelines#end-to-end-acknowledgments) when events are received by OpenSearch sinks. +`s3_select` | No | [s3_select](#s3_select) | The Amazon S3 Select configuration. +`scan` | No | [scan](#scan) | The S3 scan configuration. +`delete_s3_objects_on_read` | No | Boolean | When `true`, the S3 scan attempts to delete S3 objects after all events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled when deleting S3 objects. Default is `false`. ## sqs @@ -98,20 +104,20 @@ The following parameters allow you to configure usage for Amazon SQS in the `s3` Option | Required | Type | Description :--- | :--- | :--- | :--- -queue_url | Yes | String | The URL of the Amazon SQS queue from which messages are received. -maximum_messages | No | Integer | The maximum number of messages to receive from the Amazon SQS queue in any single request. Default value is `10`. -visibility_timeout | No | Duration | The visibility timeout to apply to messages read from the Amazon SQS queue. This should be set to the amount of time that Data Prepper may take to read all the S3 objects in a batch. Default value is `30s`. -wait_time | No | Duration | The amount of time to wait for long polling on the Amazon SQS API. Default value is `20s`. -poll_delay | No | Duration | A delay to place between reading/processing a batch of Amazon SQS messages and making a subsequent request. Default value is `0s`. +`queue_url` | Yes | String | The URL of the Amazon SQS queue from which messages are received. +`maximum_messages` | No | Integer | The maximum number of messages to receive from the Amazon SQS queue in any single request. Default is `10`. +`visibility_timeout` | No | Duration | The visibility timeout to apply to messages read from the Amazon SQS queue. This should be set to the amount of time that Data Prepper may take to read all the S3 objects in a batch. Default is `30s`. +`wait_time` | No | Duration | The amount of time to wait for long polling on the Amazon SQS API. Default is `20s`. +`poll_delay` | No | Duration | A delay placed between the reading and processing of a batch of Amazon SQS messages and making a subsequent request. Default is `0s`. ## aws Option | Required | Type | Description :--- | :--- | :--- | :--- -region | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). -sts_role_arn | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). -aws_sts_header_overrides | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. +`region` | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). +`sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). +`aws_sts_header_overrides` | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. ## codec @@ -125,8 +131,8 @@ Use the following options to configure the `newline` codec. Option | Required | Type | Description :--- | :--- |:--------| :--- -skip_lines | No | Integer | The number of lines to skip before creating events. You can use this configuration to skip common header rows. Default is `0`. -header_destination | No | String | A key value to assign to the header line of the S3 object. If this option is specified, then each event will contain a header_destination field. +`skip_lines` | No | Integer | The number of lines to skip before creating events. You can use this configuration to skip common header rows. Default is `0`. +`header_destination` | No | String | A key value to assign to the header line of the S3 object. If this option is specified, then each event will contain a `header_destination` field. ### json codec @@ -138,23 +144,23 @@ The `csv` codec parses objects in comma-separated value (CSV) format, with each Option | Required | Type | Description :--- |:---------|:------------| :--- -delimiter | Yes | Integer | The delimiter separating columns. Default is `,`. -quote_character | Yes | String | The character used as a text qualifier for CSV data. Default is `"`. -header | No | String list | The header containing the column names used to parse CSV data. -detect_header | No | Boolean | Whether the first line of the S3 object should be interpreted as a header. Default is `true`. +`delimiter` | Yes | Integer | The delimiter separating columns. Default is `,`. +`quote_character` | Yes | String | The character used as a text qualifier for CSV data. Default is `"`. +`header` | No | String list | The header containing the column names used to parse CSV data. +`detect_header` | No | Boolean | Whether the first line of the S3 object should be interpreted as a header. Default is `true`. -## Using `s3_select` with the `s3` source +## Using `s3_select` with the `s3` source When configuring `s3_select` to parse S3 objects, use the following options. Option | Required | Type | Description :--- |:-----------------------|:------------| :--- -expression | Yes, when using `s3_select` | String | The expression used to query the object. Maps directly to the [expression](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html#AmazonS3-SelectObjectContent-request-Expression) property. -expression_type | No | String | The type of the provided expression. Default value is `SQL`. Maps directly to the [ExpressionType](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html#AmazonS3-SelectObjectContent-request-ExpressionType). -input_serialization | Yes, when using `s3_select` | String | Provides the S3 Select file format. Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. May be `csv`, `json`, or `parquet`. -compression_type | No | String | Specifies an object's compression format. Maps directly to the [CompressionType](https://docs.aws.amazon.com/AmazonS3/latest/API/API_InputSerialization.html#AmazonS3-Type-InputSerialization-CompressionType). -csv | No | [csv](#s3_select_csv) | Provides the CSV configuration for processing CSV data. -json | No | [json](#s3_select_json) | Provides the JSON configuration for processing JSON data. +`expression` | Yes, when using `s3_select` | String | The expression used to query the object. Maps directly to the [expression](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html#AmazonS3-SelectObjectContent-request-Expression) property. +`expression_type` | No | String | The type of the provided expression. Default value is `SQL`. Maps directly to the [ExpressionType](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html#AmazonS3-SelectObjectContent-request-ExpressionType). +`input_serialization` | Yes, when using `s3_select` | String | Provides the S3 Select file format. Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. May be `csv`, `json`, or `parquet`. +`compression_type` | No | String | Specifies an object's compression format. Maps directly to the [CompressionType](https://docs.aws.amazon.com/AmazonS3/latest/API/API_InputSerialization.html#AmazonS3-Type-InputSerialization-CompressionType). +`csv` | No | [csv](#s3_select_csv) | Provides the CSV configuration for processing CSV data. +`json` | No | [json](#s3_select_json) | Provides the JSON configuration for processing JSON data. ### csv @@ -164,18 +170,59 @@ These options map directly to options available in the S3 Select [CSVInput](http Option | Required | Type | Description :--- |:---------|:------------| :--- -file_header_info | No | String | Describes the first line of input. Maps directly to the [FileHeaderInfo](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-FileHeaderInfo) property. -quote_escape | No | String | A single character used for escaping the quotation mark character inside an already escaped value. Maps directly to the [QuoteEscapeCharacter](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-QuoteEscapeCharacter) property. -comments | No | String | A single character used to indicate that a row should be ignored when the character is present at the start of that row. Maps directly to the [Comments](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-Comments) property. +`file_header_info` | No | String | Describes the first line of input. Maps directly to the [FileHeaderInfo](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-FileHeaderInfo) property. +`quote_escape` | No | String | A single character used for escaping the quotation mark character inside an already escaped value. Maps directly to the [QuoteEscapeCharacter](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-QuoteEscapeCharacter) property. +`comments` | No | String | A single character used to indicate that a row should be ignored when the character is present at the start of that row. Maps directly to the [Comments](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html#AmazonS3-Type-CSVInput-Comments) property. #### json Use the following option in conjunction with `json` for `s3_select` to determine how S3 Select processes the JSON file. -Option | Required | Type | Description -:--- |:---------|:------------| :--- -type | No | String | The type of JSON array. May be either `DOCUMENT` or `LINES`. Maps directly to the [Type](https://docs.aws.amazon.com/AmazonS3/latest/API/API_JSONInput.html#AmazonS3-Type-JSONInput-Type) property. +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`type` | No | String | The type of JSON array. May be either `DOCUMENT` or `LINES`. Maps directly to the [Type](https://docs.aws.amazon.com/AmazonS3/latest/API/API_JSONInput.html#AmazonS3-Type-JSONInput-Type) property. +## Using `scan` with the `s3` source +The following parameters allow you to scan S3 objects. All options can be configured at the bucket level. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`start_time` | No | String | The start of the time range during which to scan objects from all the buckets. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, for example, `023-01-23T10:00:00`, or it can be configured to the keyword that represents the current `LocalDateTime`, such as `now`. To define a time range, use `start_time` with either `end_time` or `range`. +`end_time` | No | String | The end of the time range during which to scan objects from all the buckets. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, for example, `023-01-23T10:00:00`, or it can be configured to the keyword that represents the current `LocalDateTime`, such as `now`. To define a time range, use `end_time` with either `start_time` or `range`. +`range` | No | String | The time range from which objects are scanned from all buckets. If configured with `start_time`, defines a last modified time range with `start_time` + `range`. If configured with `end_time`, defines a last modified time range with `end_time` - `range`. Supports ISO_8601 notation strings, such as `PT20.345S` or `PT15M`, and notation strings for seconds (`60s`) and milliseconds (`1600ms`). +`buckets` | Yes | List | A list of [buckets](#bucket) to scan. +`scheduling` | No | List | The configuration for scheduling periodic scans on all buckets. `start_time`, `end_time` and `range` can not be used if scheduling is configured. + +### bucket + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`bucket` | Yes | String list | Provides options for each bucket. + +You can configure the following options inside the bucket setting. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`name` | Yes | String | The string representing the S3 bucket name to scan. +`filter` | No | [Filter](#filter) | Provides the filter configuration. +`start_time` | No | String | The start of the time range during which to scan objects from all the buckets. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, for example, `023-01-23T10:00:00`, or it can be configured to the keyword that represents the current `LocalDateTime`, such as `now`. To define a time range, use `start_time` with either `end_time` or `range`. +`end_time` | No | String | The end of the time range during which to scan objects from all the buckets. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, for example, `023-01-23T10:00:00`, or it can be configured to the keyword that represents the current `LocalDateTime`, such as `now`. To define a time range, use `end_time` with either `start_time` or `range`. +`range` | No | String | The time range from which objects are scanned from all buckets. If configured with `start_time`, defines a last modified time range with `start_time` + `range`. If configured with `end_time`, defines a last modified time range with `end_time` - `range`. Supports ISO_8601 notation strings, such as `PT20.345S` or `PT15M`, and notation strings for seconds (`60s`) and milliseconds (`1600ms`). + +### filter + +Use the following options inside the `filter` configuration. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`include_prefix` | No | List | A list of S3 key prefix strings included in the scan. By default, all the objects in a bucket are included. +`exclude_suffix` | No | List | A list of S3 key suffix strings excluded from the scan. By default, no objects in a bucket are excluded. + +### scheduling +Option | Required | Type | Description +:--- | :--- | :--- | :--- +`interval` | Yes | String | Indicates the minimum interval between each scan. The next scan in the interval will start after the interval duration from the last scan ends and when all the objects from the previous scan are processed. Supports ISO_8601 notation strings, such as `PT20.345S` or `PT15M`, and notation strings for seconds (`60s`) and milliseconds (`1600ms`). +`count` | No | Integer | Specifies how many times a bucket will be scanned. Defaults to `Integer.MAX_VALUE`. ## Metrics @@ -190,6 +237,10 @@ The `s3` source includes the following metrics. * `sqsMessagesReceived`: The number of Amazon SQS messages received from the queue by the `s3` source. * `sqsMessagesDeleted`: The number of Amazon SQS messages deleted from the queue by the `s3` source. * `sqsMessagesFailed`: The number of Amazon SQS messages that the `s3` source failed to parse. +* `s3ObjectNoRecordsFound` -- The number of S3 objects that resulted in 0 records added to the buffer by the `s3` source. +* `sqsMessagesDeleteFailed` -- The number of SQS messages that the `s3` source failed to delete from the SQS queue. +* `s3ObjectsDeleted` -- The number of S3 objects deleted by the `s3` source. +* `s3ObjectsDeleteFailed` -- The number of S3 objects that the `s3` source failed to delete. ### Timers @@ -202,7 +253,7 @@ The `s3` source includes the following metrics. * `s3ObjectProcessedBytes`: Measures the bytes processed by the `s3` source for a given object. For compressed objects, this is the uncompressed size. * `s3ObjectsEvents`: Measures the number of events (sometimes called records) produced by an S3 object. -## Example: Uncompressed logs +## Example: Uncompressed logs with sqs The following pipeline.yaml file shows the minimum configuration for reading uncompressed newline-delimited logs: @@ -219,3 +270,27 @@ source: region: "us-east-1" sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper" ``` + +## Example: Uncompressed logs with scan + +The following pipeline.yaml file shows the minimum configuration for scanning objects with uncompressed newline-delimited logs: + +``` +source: + s3: + codec: + newline: + compression: none + aws: + region: "us-east-1" + sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper" + scan: + start_time: 2023-01-01T00:00:00 + range: "P365D" + buckets: + - bucket: + name: "s3-scan-test" + filter: + exclude_suffix: + - "*.log" +```