Add DLQ and aggregate processor edits. Baseline processor names (#3880)

* Add DLQ and aggregate processor edits. Baseline processor names

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Remove processor from titles

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Update dlq.md

Signed-off-by: Naarcha-AWS <97990722+Naarcha-AWS@users.noreply.github.com>

* Add additional feedback

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* add cross links

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Fix typos

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Add doc review

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Remove link

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Apply suggestions from code review

Co-authored-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com>
Signed-off-by: Naarcha-AWS <97990722+Naarcha-AWS@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com>
Signed-off-by: Naarcha-AWS <97990722+Naarcha-AWS@users.noreply.github.com>

* Update dlq.md

Signed-off-by: Naarcha-AWS <97990722+Naarcha-AWS@users.noreply.github.com>

* Fix processor title

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>

* Apply suggestions from code review

Co-authored-by: Nathan Bower <nbower@amazon.com>
Signed-off-by: Naarcha-AWS <97990722+Naarcha-AWS@users.noreply.github.com>

---------

Signed-off-by: Naarcha-AWS <naarcha@amazon.com>
Signed-off-by: Naarcha-AWS <97990722+Naarcha-AWS@users.noreply.github.com>
Co-authored-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com>
Co-authored-by: Nathan Bower <nbower@amazon.com>
This commit is contained in:
Naarcha-AWS 2023-04-27 14:01:01 -05:00 committed by GitHub
parent b5d9cc598a
commit 4251fb04bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 276 additions and 78 deletions

View File

@ -1,6 +1,6 @@
---
layout: default
title: Add entries processor
title: add_entries
parent: Processors
grand_parent: Pipelines
nav_order: 40
@ -28,10 +28,8 @@ To get started, create the following `pipeline.yaml` file:
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
...
....
processor:
- add_entries:
entries:
@ -39,20 +37,17 @@ pipeline:
value: 3
overwrite_if_key_exists: true
sink:
- stdout:
```
{% include copy.html %}
Next, create a log file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` file with that filepath. For more information, see [Configuring Data Prepper]({{site.url}}{{site.baseurl}}/data-prepper/getting-started/#2-configuring-data-prepper).
For example, before you run the `add_entries` processor, if the `logs_json.log` file contains the following event record:
For example, when your source contains the following event record:
```json
{"message": "hello"}
```
Then when you run the `add_entries` processor using the previous configuration, it adds a new entry `{"newMessage": 3}` to the existing event `{"message": "hello"}` so that the new event contains two entries in the final output:
And then you run the `add_entries` processor using the example pipeline, it adds a new entry, `{"newMessage": 3}`, to the existing event, `{"message": "hello"}`, so that the new event contains two entries in the final output:
```json
{"message": "hello", "newMessage": 3}

View File

@ -1,6 +1,6 @@
---
layout: default
title: Aggregate processor
title: aggregate
parent: Processors
grand_parent: Pipelines
nav_order: 41
@ -8,7 +8,7 @@ nav_order: 41
# aggregate
The `aggregate` processor groups events based on the keys provided and performs an action on each group.
The `aggregate` processor groups events based on the values of `identification_keys`. Then, the processor performs an action on each group, helping reduce unnecessary log volume and creating aggregated logs over time. You can use existing actions or create your own custom aggregations using Java code.
## Configuration
@ -18,12 +18,139 @@ The following table describes the options you can use to configure the `aggregat
Option | Required | Type | Description
:--- | :--- | :--- | :---
identification_keys | Yes | List | An unordered list by which to group events. Events with the same values as these keys are put into the same group. If an event does not contain one of the `identification_keys`, then the value of that key is considered to be equal to `null`. At least one identification_key is required (for example, `["sourceIp", "destinationIp", "port"]`).
action | Yes | AggregateAction | The action to be performed for each group. One of the available aggregate actions must be provided or you can create custom aggregate actions. `remove_duplicates` and `put_all` are the available actions. For more information, see [Creating New Aggregate Actions](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/aggregate-processor#creating-new-aggregate-actions).
action | Yes | AggregateAction | The action to be performed on each group. One of the [available aggregate actions](#available-aggregate-actions) must be provided, or you can create custom aggregate actions. `remove_duplicates` and `put_all` are the available actions. For more information, see [Creating New Aggregate Actions](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/aggregate-processor#creating-new-aggregate-actions).
group_duration | No | String | The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings ("PT20.345S", "PT15M", etc.) as well as simple notation for seconds (`"60s"`) and milliseconds (`"1500ms"`). Default value is `180s`.
<!---## Configuration
## Available aggregate actions
Content will be added to this section.--->
Use the following aggregate actions to determine how the `aggregate` processor processes events in each group.
### remove_duplicates
The `remove_duplicates` action processes the first event for a group immediately and drops any events that duplicate the first event from the source. For example, when using `identification_keys: ["sourceIp", "destination_ip"]`:
1. The `remove_duplicates` action processes `{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }`, the first event in the source.
2. Data Prepper drops the `{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }` event because the `sourceIp` and `destinationIp` match the first event in the source.
3. The `remove_duplicates` action processes the next event, `{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }`. Because the `sourceIp` is different from the first event of the group, Data Prepper creates a new group based on the event.
### put_all
The `put_all` action combines events belonging to the same group by overwriting existing keys and adding new keys, similarly to the Java `Map.putAll`. The action drops all events that make up the combined event. For example, when using `identification_keys: ["sourceIp", "destination_ip"]`, the `put_all` action processes the following three events:
```
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" }
```
Then the action combines the events into one. The pipeline then uses the following combined event:
```
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200, "bytes": 1000, "http_verb": "GET" }
```
### count
The `count` event counts events that belong to the same group and generates a new event with values of the `identification_keys` and the count, which indicates the number of new events. You can customize the processor with the following configuration options:
* `count_key`: Key used for storing the count. Default name is `aggr._count`.
* `start_time_key`: Key used for storing the start time. Default name is `aggr._start_time`.
* `output_format`: Format of the aggregated event.
* `otel_metrics`: Default output format. Outputs in OTel metrics SUM type with count as value.
* `raw` - Generates a JSON object with the `count_key` field as a count value and the `start_time_key` field with aggregation start time as value.
For an example, when using `identification_keys: ["sourceIp", "destination_ip"]`, the `count` action counts and processes the following events:
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 503 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 400 }
```
The processor creates the following event:
```json
{"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"}
```
### histogram
The `histogram` action aggregates events belonging to the same group and generates a new event with values of the `identification_keys` and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. The action drops all events that make up the combined event.
You can customize the processor with the following configuration options:
* `key`: Name of the field in the events the histogram generates.
* `generated_key_prefix`: `key_prefix` used by all the fields created in the aggregated event. Having a prefix ensures that the names of the histogram event do not conflict with the field names in the event.
* `units`: The units for the values in the `key`.
* `record_minmax`: A Boolean value indicating whether the histogram should include the min and max of the values in the aggregation.
* `buckets`: A list of buckets (values of type `double`) indicating the buckets in the histogram.
* `output_format`: Format of the aggregated event.
* `otel_metrics`: Default output format. Outputs in OTel metrics SUM type with count as value.
* `raw`: Generates a JSON object with `count_key` field with count as value and `start_time_key` field with aggregation start time as value.
For example, when using `identification_keys: ["sourceIp", "destination_ip", "request"]`, `key: latency`, and `buckets: [0.0, 0.25, 0.5]`, the `histogram` action processes the following events:
```
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.2 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.55}
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.25 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.15 }
```
Then the processor creates the following event:
```json
{"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"}
```
### rate_limiter
The `rate_limiter` action controls the number of events aggregated per second. By default, `rate_limiter` blocks the `aggregate` processor from running if it receives more events than the configured number allowed. You can overwrite the number events that triggers the `rate_limited` by using the `when_exceeds` configuration option.
You can customize the processor with the following configuration options:
* `events_per_second`: The number of events allowed per second.
* `when_exceeds`: Indicates what action the `rate_limiter` takes when the number of events received is greater than the number of events allowed per second. Default value is `block`, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the `drop` option drops the excess events received in that second.
For example, if `events_per_second` is set to `1` and `when_exceeds` is set to `drop`, the action tries to process the following events when received during the one second time interval:
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" }
```
The following event is processed, but all other events are ignored because the `rate_limiter` blocks them:
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
```
If `when_exceeds` is set to `drop`, all three events are processed.
### percent_sampler
The `percent_sampler` action controls the number of events aggregated based on a percentage of events. The action drops any events not included in the percentage.
You can set the percentage of events using the `percent` configuration, which indicates the percentage of events processed during a one second interval (0%--100%).
For example, if percent is set to `50`, the action tries to process the following events in the one-second interval:
```
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
```
The pipeline processes 50% of the events, drops the other events, and does not generate a new event:
```
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
```
## Metrics

View File

@ -1,6 +1,6 @@
---
layout: default
title: Anomaly detector
title: anomaly_detector
parent: Processors
grand_parent: Pipelines
nav_order: 45
@ -58,14 +58,13 @@ To get started, create the following `pipeline.yaml` file. You can use the follo
```yaml
ad-pipeline:
source:
http:
...
....
processor:
- anomaly_detector:
keys: ["latency"]
mode:
random_cut_forest:
sink:
- stdout:
```
When you run the anomaly detector processor, the processor extracts the value for the `latency` key, and then passes the value through the RCF ML algorithm. You can configure any key that comprises integers or real numbers as values. In the following example, you can configure `bytes` or `latency` as the key for an anomaly detector.

View File

@ -1,6 +1,6 @@
---
layout: default
title: Convert entry type processor
title: convert_entry_type
parent: Processors
grand_parent: Pipelines
nav_order: 47
@ -26,16 +26,12 @@ To get started, create the following `pipeline.yaml` file:
```yaml
type-conv-pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
...
....
processor:
- convert_entry_type_type:
key: "response_status"
type: "integer"
sink:
- stdout:
```
{% include copy.html %}

View File

@ -1,6 +1,6 @@
---
layout: default
title: Copy values processor
title: copy_values
parent: Processors
grand_parent: Pipelines
nav_order: 48
@ -28,10 +28,8 @@ To get started, create the following `pipeline.yaml` file:
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
...
....
processor:
- copy_values:
entries:
@ -39,7 +37,6 @@ pipeline:
to_key: "newMessage"
overwrite_if_to_key_exists: true
sink:
- stdout:
```
{% include copy.html %}

View File

@ -1,6 +1,6 @@
---
layout: default
title: CSV processor
title: csv
parent: Processors
grand_parent: Pipelines
nav_order: 49

View File

@ -1,6 +1,6 @@
---
layout: default
title: Date
title: date
parent: Processors
grand_parent: Pipelines
nav_order: 50

View File

@ -25,15 +25,12 @@ To get started, create the following `pipeline.yaml` file:
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
...
....
processor:
- delete_entries:
with_keys: ["message"]
sink:
- stdout:
```
{% include copy.html %}

View File

@ -1,6 +1,6 @@
---
layout: default
title: Drop events processor
title: drop_events
parent: Processors
grand_parent: Pipelines
nav_order: 52

View File

@ -1,6 +1,6 @@
---
layout: default
title: Key value processor
title: key_value
parent: Processors
grand_parent: Pipelines
nav_order: 54

View File

@ -1,7 +1,6 @@
---
layout: default
title: List to map processor
parent: Processors
title: list_to_map
grand_parent: Pipelines
nav_order: 55
---

View File

@ -1,6 +1,6 @@
---
layout: default
title: Lowercase string processor
title: lowercase_string
parent: Processors
grand_parent: Pipelines
nav_order: 60

View File

@ -1,12 +1,12 @@
---
layout: default
title: otel_metrics processor
title: otel_metrics
parent: Processors
grand_parent: Pipelines
nav_order: 72
---
# otel_metrics processor
# otel_metrics
The `otel_metrics` processor serializes a collection of `ExportMetricsServiceRequest` records sent from the [OTel metrics source]({{site.url}}{{site.baseurl}}//data-prepper/pipelines/configuration/sources/otel-metrics-source/) into a collection of string records.

View File

@ -1,12 +1,12 @@
---
layout: default
title: otel_trace_group processor
title: otel_trace_group
parent: Processors
grand_parent: Pipelines
nav_order: 45
---
# otel_trace_group processor
# otel_trace_group
The `otel_trace_group` processor completes missing trace-group-related fields in the collection of [span](https://github.com/opensearch-project/data-prepper/blob/834f28fdf1df6d42a6666e91e6407474b88e7ec6/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/Span.java) records by looking up the OpenSearch backend. The `otel_trace_group` processor identifies the missing trace group information for a `spanId` by looking up the relevant fields in its root `span` stored in OpenSearch.

View File

@ -1,15 +1,14 @@
---
layout: default
title: OTel trace raw processor
title: otel_trace
parent: Processors
grand_parent: Pipelines
nav_order: 75
---
# otel_trace_raw
# otel_trace
The `otel_trace_raw` processor completes trace-group-related fields in all incoming Data Prepper span records by state caching the root span information for each `tradeId`.
The `otel_trace` processor completes trace-group-related fields in all incoming Data Prepper span records by state caching the root span information for each `tradeId`.
## Parameters
@ -22,7 +21,7 @@ This processor includes the following parameters.
## Configuration
The following table describes the options you can use to configure the `otel_trace_raw` processor.
The following table describes the options you can use to configure the `otel_trace` processor.
Option | Required | Type | Description
:--- | :--- | :--- | :---
@ -39,7 +38,7 @@ The following table describes common [Abstract processor](https://github.com/ope
| `recordsOut` | Counter | Metric representing the egress of records from a pipeline component. |
| `timeElapsed` | Timer | Metric representing the time elapsed during execution of a pipeline component. |
The `otel_trace_raw` processor includes the following custom metrics.
The `otel_trace` processor includes the following custom metrics:
* `traceGroupCacheCount`: The number of trace groups in the trace group cache.
* `spanSetCount`: The number of span sets in the span set collection.

View File

@ -1,6 +1,6 @@
---
layout: default
title: Parse JSON processor
title: parse_json
parent: Processors
grand_parent: Pipelines
nav_order: 80
@ -28,11 +28,10 @@ To get started, create the following `pipeline.yaml` file:
```yaml
parse-json-pipeline:
source:
stdin:
...
....
processor:
- parse_json:
sink:
- stdout:
```
### Basic example
@ -57,12 +56,11 @@ You can use a JSON pointer to parse a selection of the JSON data by specifying t
```yaml
parse-json-pipeline:
source:
stdin:
...
....
processor:
- parse_json:
pointer: "outer_key/inner_key"
sink:
- stdout:
```
To test the `parse_json` processor with the pointer option, run the pipeline, paste the following line into your console, and then enter `exit` on a new line:

View File

@ -1,6 +1,6 @@
---
layout: default
title: Rename keys processor
title: rename_keys
parent: Processors
grand_parent: Pipelines
nav_order: 85

View File

@ -1,6 +1,6 @@
---
layout: default
title: Routes
title: routes
parent: Processors
grand_parent: Pipelines
nav_order: 90

View File

@ -1,18 +1,18 @@
---
layout: default
title: Service map stateful processor
title: service_map
parent: Processors
grand_parent: Pipelines
nav_order: 95
---
# service_map_stateful
# service_map
The `service_map_stateful` processor uses OpenTelemetry data to create a distributed service map for visualization in OpenSearch Dashboards.
The `service_map` processor uses OpenTelemetry data to create a distributed service map for visualization in OpenSearch Dashboards.
## Configuration
The following table describes the option you can use to configure the `service_map_stateful` processor.
The following table describes the option you can use to configure the `service_map` processor.
Option | Required | Type | Description
:--- | :--- | :--- | :---

View File

@ -1,6 +1,6 @@
---
layout: default
title: Split string processor
title: split_string
parent: Processors
grand_parent: Pipelines
nav_order: 100

View File

@ -1,6 +1,6 @@
---
layout: default
title: String converter processor
title: string_converter
parent: Processors
grand_parent: Pipelines
nav_order: 105

View File

@ -1,6 +1,6 @@
---
layout: default
title: Substitute string processors
title: substitute_string
parent: Processors
grand_parent: Pipelines
nav_order: 110

View File

@ -1,6 +1,6 @@
---
layout: default
title: Trace peer forwarder processors
title: trace_peer_forwarder
parent: Processors
grand_parent: Pipelines
nav_order: 115
@ -8,13 +8,13 @@ nav_order: 115
# trace peer forwarder
The `trace peer forwarder` processor is used with [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/) to reduce by half the number of events forwarded in a [Trace Analytics]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/trace-analytics/) pipeline. In Trace Analytics, each event is typically duplicated when it is sent from `otel-trace-pipeline` to `raw-pipeline` and `service-map-pipeline`. When pipelines forward events, this causes the core peer forwarder to send multiple HTTP requests for the same event. You can use `trace peer forwarder` to forward an event once through the `otel-trace-pipeline` instead of `raw-pipeline` and `service-map-pipeline`, which prevents unnecessary HTTP requests.
The `trace_peer_forwarder` processor is used with [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/) to reduce by half the number of events forwarded in a [Trace Analytics]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/trace-analytics/) pipeline. In Trace Analytics, each event is typically duplicated when it is sent from `otel-trace-pipeline` to `raw-pipeline` and `service-map-pipeline`. When pipelines forward events, this causes the core peer forwarder to send multiple HTTP requests for the same event. You can use `trace peer forwarder` to forward an event once through the `otel-trace-pipeline` instead of `raw-pipeline` and `service-map-pipeline`, which prevents unnecessary HTTP requests.
You should use `trace peer forwarder` for Trace Analytics pipelines when you have multiple nodes.
You should use `trace_peer_forwarder` for Trace Analytics pipelines when you have multiple nodes.
## Usage
To get started with `trace peer forwarder`, first configure [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/). Then create a `pipeline.yaml` file and specify `trace peer forwarder` as the processor. You can configure `peer forwarder` in your `data-prepper-config.yaml` file. For more detailed information, see [Configuring Data Prepper]({{site.url}}{{site.baseurl}}/data-prepper/getting-started/#2-configuring-data-prepper).
To get started with `trace_peer_forwarder`, first configure [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/). Then create a `pipeline.yaml` file and specify `trace peer forwarder` as the processor. You can configure `peer forwarder` in your `data-prepper-config.yaml` file. For more detailed information, see [Configuring Data Prepper]({{site.url}}{{site.baseurl}}/data-prepper/getting-started/#2-configuring-data-prepper).
See the following example `pipeline.yaml` file:

View File

@ -1,6 +1,6 @@
---
layout: default
title: Trim string processors
title: trim_string
parent: Processors
grand_parent: Pipelines
nav_order: 120

View File

@ -1,6 +1,6 @@
---
layout: default
title: Uppercase string processor
title: uppercase_string
parent: Processors
grand_parent: Pipelines
nav_order: 125

View File

@ -59,7 +59,7 @@ password | No | String | Password for HTTP basic authentication.
aws_sigv4 | No | Boolean | Default value is false. Whether to use AWS Identity and Access Management (IAM) signing to connect to an Amazon OpenSearch Service domain. For your access key, secret key, and optional session token, Data Prepper uses the default credential chain (environment variables, Java system properties, `~/.aws/credential`, etc.).
aws_region | No | String | The AWS region (for example, `"us-east-1"`) for the domain if you are connecting to Amazon OpenSearch Service.
aws_sts_role_arn | No | String | IAM role that the plugin uses to sign requests sent to Amazon OpenSearch Service. If this information is not provided, the plugin uses the default credentials.
max_retries | No | Integer | The maximum number of times the OpenSearch sink should try to push data to the OpenSearch server before considering it as failure. Defaults to `Integer.MAX_VALUE`. If not provided, the sink will try to push data to the OpenSearch server indefinitely because the default value is high and exponential backoff would increase the waiting time before retry.
[max_retries](#configure-max_retries) | No | Integer | The maximum number of times the OpenSearch sink should try to push data to the OpenSearch server before considering it to be a failure. Defaults to `Integer.MAX_VALUE`. If not provided, the sink will try to push data to the OpenSearch server indefinitely because the default value is high and exponential backoff would increase the waiting time before retry.
socket_timeout | No | Integer | The timeout, in milliseconds, waiting for data to return (or the maximum period of inactivity between two consecutive data packets). A timeout value of zero is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient would rely on operating system settings for managing socket timeouts.
connect_timeout | No | Integer | The timeout in milliseconds used when requesting a connection from the connection manager. A timeout value of zero is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient would rely on operating system settings for managing connection timeouts.
insecure | No | Boolean | Whether or not to verify SSL certificates. If set to true, certificate authority (CA) certificate verification is disabled and insecure HTTP requests are sent instead. Default value is `false`.
@ -69,12 +69,20 @@ index_type | No | String | This index type tells the Sink plugin what type of da
template_file | No | String | Path to a JSON [index template]({{site.url}}{{site.baseurl}}/opensearch/index-templates/) file (for example, `/your/local/template-file.json`) if `index_type` is `custom`. See [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json) for an example.
document_id_field | No | String | The field from the source data to use for the OpenSearch document ID (for example, `"my-field"`) if `index_type` is `custom`.
dlq_file | No | String | The path to your preferred dead letter queue file (for example, `/your/local/dlq-file`). Data Prepper writes to this file when it fails to index a document on the OpenSearch cluster.
dlq | No | N/A | DLQ configurations. See [Dead Letter Queues (DLQ)](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is also available, the sink will fail.
dlq | No | N/A | DLQ configurations. See [Dead Letter Queues]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/dlq/) for details. If the `dlq_file` option is also available, the sink will fail.
bulk_size | No | Integer (long) | The maximum size (in MiB) of bulk requests sent to the OpenSearch cluster. Values below 0 indicate an unlimited size. If a single document exceeds the maximum bulk request size, Data Prepper sends it individually. Default value is 5.
ism_policy_file | No | String | The absolute file path for an ISM (Index State Management) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, `custom` index type is currently the only one without a built-in policy file, thus it would use the policy file here if it's provided through this parameter. For more information, see [ISM policies]({{site.url}}{{site.baseurl}}/im-plugin/ism/policies/).
number_of_shards | No | Integer | The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/).
number_of_replicas | No | Integer | The number of replica shards each primary shard should have on the destination OpenSearch server. For example, if you have 4 primary shards and set number_of_replicas to 3, the index has 12 replica shards. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/).
### Configure max_retries
You can include the `max_retries` option in your pipeline configuration to control the number of times the source tries to write to sinks with exponential backoff. If you don't include this option, pipelines keep retrying forever.
If you specify `max_retries` and a pipeline has a [dead-letter queue (DLQ)]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/dlq/) configured, the pipeline will keep trying to write to sinks until it reaches the maximum number of retries, at which point it starts to send failed data to the DLQ.
If you don't specify `max_retries`, only data that is rejected by sinks is written to the DLQ. Pipelines continue to try to write all other data to the sinks.
## OpenSearch cluster security
In order to send data to an OpenSearch cluster using the `opensearch` sink plugin, you must specify your username and password within the pipeline configuration. The following example `pipelines.yaml` file demonstrates how to specify admin security credentials:

View File

@ -0,0 +1,83 @@
---
layout: default
title: Dead-letter queues
parent: Pipelines
nav_order: 13
---
# Dead-letter queues
Data Prepper pipelines support dead-letter queues (DLQs) for offloading failed events and making them accessible for analysis.
As of Data Prepper 2.3, only the `s3` source supports DLQs.
## Configure a DLQ writer
To configure a DLQ writer for the `s3` source, add the following to your pipeline.yaml file:
```yaml
sink:
opensearch:
dlq:
s3:
bucket: "my-dlq-bucket"
key_path_prefix: "dlq-files/"
region: "us-west-2"
sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"
```
The resulting DLQ file outputs as a JSON array of DLQ objects. Any file written to the S3 DLQ contains the following name pattern:
```
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
```
The following information is replaced in the name pattern:
- `version`: The Data Prepper version.
- `pipelineName`: The pipeline name indicated in pipeline.yaml.
- `pluginId`: The ID of the plugin associated with the DLQ event.
## Configuration
DLQ supports the following configuration options.
Option | Required | Type | Description
:--- | :--- | :--- | :---
bucket | Yes | String | The name of the bucket into which the DLQ outputs failed records.
key_path_prefix | No | String | The `key_prefix` used in the S3 bucket. Defaults to `""`. Supports time value pattern variables, such as `/%{yyyy}/%{MM}/%{dd}`, including any variables listed in the [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, when using the `/%{yyyy}/%{MM}/%{dd}` pattern, you can set `key_prefix` as `/2023/01/24`.
region | No | String | The AWS Region of the S3 bucket. Defaults to `us-east-1`.
sts_role_arn | No | String | The STS role the DLQ assumes in order to write to an AWS S3 bucket. Default is `null`, which uses the standard SDK behavior for credentials. To use this option, the S3 bucket must have the `S3:PutObject` permission configured.
When using DLQ with an OpenSearch sink, you can configure the [max_retries]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/sinks/opensearch/#configure-max_retries) option to send failed data to the DLQ when the sink reaches the maximum number of retries.
## Metrics
DLQ supports the following metrics.
### Counter
- `dlqS3RecordsSuccess`: Measures the number of successful records sent to S3.
- `dlqS3RecordsFailed`: Measures the number of records that failed to be sent to S3.
- `dlqS3RequestSuccess`: Measures the number of successful S3 requests.
- `dlqS3RequestFailed`: Measures the number of failed S3 requests.
### Distribution summary
- `dlqS3RequestSizeBytes`: Measures the distribution of the S3 request's payload size in bytes.
### Timer
- `dlqS3RequestLatency`: Measures latency when sending each S3 request, including retries.
## DLQ objects
DLQ supports the following DLQ objects:
* `pluginId`: The ID of the plugin that originated the event sent to the DLQ.
* `pluginName`: The name of the plugin.
* `failedData` : An object that contains the failed object and its options. This object is unique to each plugin.
* `pipelineName`: The name of the Data Prepper pipeline in which the event failed.
* `timestamp`: The timestamp of the failures in an `ISO8601` format.