diff --git a/_data-prepper/pipelines/configuration/processors/add-entries.md b/_data-prepper/pipelines/configuration/processors/add-entries.md index fe69ae1b..589f463a 100644 --- a/_data-prepper/pipelines/configuration/processors/add-entries.md +++ b/_data-prepper/pipelines/configuration/processors/add-entries.md @@ -1,6 +1,6 @@ --- layout: default -title: Add entries processor +title: add_entries parent: Processors grand_parent: Pipelines nav_order: 40 @@ -28,10 +28,8 @@ To get started, create the following `pipeline.yaml` file: ```yaml pipeline: source: - file: - path: "/full/path/to/logs_json.log" - record_type: "event" - format: "json" + ... + .... processor: - add_entries: entries: @@ -39,20 +37,17 @@ pipeline: value: 3 overwrite_if_key_exists: true sink: - - stdout: ``` {% include copy.html %} -Next, create a log file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` file with that filepath. For more information, see [Configuring Data Prepper]({{site.url}}{{site.baseurl}}/data-prepper/getting-started/#2-configuring-data-prepper). - -For example, before you run the `add_entries` processor, if the `logs_json.log` file contains the following event record: +For example, when your source contains the following event record: ```json {"message": "hello"} ``` -Then when you run the `add_entries` processor using the previous configuration, it adds a new entry `{"newMessage": 3}` to the existing event `{"message": "hello"}` so that the new event contains two entries in the final output: +And then you run the `add_entries` processor using the example pipeline, it adds a new entry, `{"newMessage": 3}`, to the existing event, `{"message": "hello"}`, so that the new event contains two entries in the final output: ```json {"message": "hello", "newMessage": 3} diff --git a/_data-prepper/pipelines/configuration/processors/aggregate.md b/_data-prepper/pipelines/configuration/processors/aggregate.md index 36bf3dd8..699d2502 100644 --- a/_data-prepper/pipelines/configuration/processors/aggregate.md +++ b/_data-prepper/pipelines/configuration/processors/aggregate.md @@ -1,6 +1,6 @@ --- layout: default -title: Aggregate processor +title: aggregate parent: Processors grand_parent: Pipelines nav_order: 41 @@ -8,7 +8,7 @@ nav_order: 41 # aggregate -The `aggregate` processor groups events based on the keys provided and performs an action on each group. +The `aggregate` processor groups events based on the values of `identification_keys`. Then, the processor performs an action on each group, helping reduce unnecessary log volume and creating aggregated logs over time. You can use existing actions or create your own custom aggregations using Java code. ## Configuration @@ -18,12 +18,139 @@ The following table describes the options you can use to configure the `aggregat Option | Required | Type | Description :--- | :--- | :--- | :--- identification_keys | Yes | List | An unordered list by which to group events. Events with the same values as these keys are put into the same group. If an event does not contain one of the `identification_keys`, then the value of that key is considered to be equal to `null`. At least one identification_key is required (for example, `["sourceIp", "destinationIp", "port"]`). -action | Yes | AggregateAction | The action to be performed for each group. One of the available aggregate actions must be provided or you can create custom aggregate actions. `remove_duplicates` and `put_all` are the available actions. For more information, see [Creating New Aggregate Actions](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/aggregate-processor#creating-new-aggregate-actions). +action | Yes | AggregateAction | The action to be performed on each group. One of the [available aggregate actions](#available-aggregate-actions) must be provided, or you can create custom aggregate actions. `remove_duplicates` and `put_all` are the available actions. For more information, see [Creating New Aggregate Actions](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/aggregate-processor#creating-new-aggregate-actions). group_duration | No | String | The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings ("PT20.345S", "PT15M", etc.) as well as simple notation for seconds (`"60s"`) and milliseconds (`"1500ms"`). Default value is `180s`. - +Use the following aggregate actions to determine how the `aggregate` processor processes events in each group. + +### remove_duplicates + +The `remove_duplicates` action processes the first event for a group immediately and drops any events that duplicate the first event from the source. For example, when using `identification_keys: ["sourceIp", "destination_ip"]`: + +1. The `remove_duplicates` action processes `{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }`, the first event in the source. +2. Data Prepper drops the `{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }` event because the `sourceIp` and `destinationIp` match the first event in the source. +3. The `remove_duplicates` action processes the next event, `{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }`. Because the `sourceIp` is different from the first event of the group, Data Prepper creates a new group based on the event. + +### put_all + +The `put_all` action combines events belonging to the same group by overwriting existing keys and adding new keys, similarly to the Java `Map.putAll`. The action drops all events that make up the combined event. For example, when using `identification_keys: ["sourceIp", "destination_ip"]`, the `put_all` action processes the following three events: + +``` +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" } +``` + +Then the action combines the events into one. The pipeline then uses the following combined event: + +``` +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200, "bytes": 1000, "http_verb": "GET" } +``` + +### count + +The `count` event counts events that belong to the same group and generates a new event with values of the `identification_keys` and the count, which indicates the number of new events. You can customize the processor with the following configuration options: + + + * `count_key`: Key used for storing the count. Default name is `aggr._count`. +* `start_time_key`: Key used for storing the start time. Default name is `aggr._start_time`. +* `output_format`: Format of the aggregated event. + * `otel_metrics`: Default output format. Outputs in OTel metrics SUM type with count as value. + * `raw` - Generates a JSON object with the `count_key` field as a count value and the `start_time_key` field with aggregation start time as value. + +For an example, when using `identification_keys: ["sourceIp", "destination_ip"]`, the `count` action counts and processes the following events: + +```json +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 503 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 400 } +``` + +The processor creates the following event: + +```json +{"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"} +``` + +### histogram + +The `histogram` action aggregates events belonging to the same group and generates a new event with values of the `identification_keys` and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. The action drops all events that make up the combined event. + +You can customize the processor with the following configuration options: + +* `key`: Name of the field in the events the histogram generates. +* `generated_key_prefix`: `key_prefix` used by all the fields created in the aggregated event. Having a prefix ensures that the names of the histogram event do not conflict with the field names in the event. +* `units`: The units for the values in the `key`. +* `record_minmax`: A Boolean value indicating whether the histogram should include the min and max of the values in the aggregation. +* `buckets`: A list of buckets (values of type `double`) indicating the buckets in the histogram. +* `output_format`: Format of the aggregated event. + * `otel_metrics`: Default output format. Outputs in OTel metrics SUM type with count as value. + * `raw`: Generates a JSON object with `count_key` field with count as value and `start_time_key` field with aggregation start time as value. + + +For example, when using `identification_keys: ["sourceIp", "destination_ip", "request"]`, `key: latency`, and `buckets: [0.0, 0.25, 0.5]`, the `histogram` action processes the following events: + +``` +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.2 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.55} +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.25 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.15 } +``` + +Then the processor creates the following event: + +```json +{"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"} +``` + +### rate_limiter + +The `rate_limiter` action controls the number of events aggregated per second. By default, `rate_limiter` blocks the `aggregate` processor from running if it receives more events than the configured number allowed. You can overwrite the number events that triggers the `rate_limited` by using the `when_exceeds` configuration option. + +You can customize the processor with the following configuration options: + +* `events_per_second`: The number of events allowed per second. +* `when_exceeds`: Indicates what action the `rate_limiter` takes when the number of events received is greater than the number of events allowed per second. Default value is `block`, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the `drop` option drops the excess events received in that second. + +For example, if `events_per_second` is set to `1` and `when_exceeds` is set to `drop`, the action tries to process the following events when received during the one second time interval: + +```json +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" } +``` + +The following event is processed, but all other events are ignored because the `rate_limiter` blocks them: + +```json +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 } +``` + +If `when_exceeds` is set to `drop`, all three events are processed. + +### percent_sampler + +The `percent_sampler` action controls the number of events aggregated based on a percentage of events. The action drops any events not included in the percentage. + +You can set the percentage of events using the `percent` configuration, which indicates the percentage of events processed during a one second interval (0%--100%). + +For example, if percent is set to `50`, the action tries to process the following events in the one-second interval: + +``` +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 } +``` + +The pipeline processes 50% of the events, drops the other events, and does not generate a new event: + +``` +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 } +{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 } +``` ## Metrics diff --git a/_data-prepper/pipelines/configuration/processors/anomaly-detector.md b/_data-prepper/pipelines/configuration/processors/anomaly-detector.md index 19333db4..2010c538 100644 --- a/_data-prepper/pipelines/configuration/processors/anomaly-detector.md +++ b/_data-prepper/pipelines/configuration/processors/anomaly-detector.md @@ -1,6 +1,6 @@ --- layout: default -title: Anomaly detector +title: anomaly_detector parent: Processors grand_parent: Pipelines nav_order: 45 @@ -58,14 +58,13 @@ To get started, create the following `pipeline.yaml` file. You can use the follo ```yaml ad-pipeline: source: - http: + ... + .... processor: - anomaly_detector: keys: ["latency"] mode: random_cut_forest: - sink: - - stdout: ``` When you run the anomaly detector processor, the processor extracts the value for the `latency` key, and then passes the value through the RCF ML algorithm. You can configure any key that comprises integers or real numbers as values. In the following example, you can configure `bytes` or `latency` as the key for an anomaly detector. diff --git a/_data-prepper/pipelines/configuration/processors/convert_entry_type.md b/_data-prepper/pipelines/configuration/processors/convert_entry_type.md index b446ce6a..2c6cb6ff 100644 --- a/_data-prepper/pipelines/configuration/processors/convert_entry_type.md +++ b/_data-prepper/pipelines/configuration/processors/convert_entry_type.md @@ -1,6 +1,6 @@ --- layout: default -title: Convert entry type processor +title: convert_entry_type parent: Processors grand_parent: Pipelines nav_order: 47 @@ -26,16 +26,12 @@ To get started, create the following `pipeline.yaml` file: ```yaml type-conv-pipeline: source: - file: - path: "/full/path/to/logs_json.log" - record_type: "event" - format: "json" + ... + .... processor: - convert_entry_type_type: key: "response_status" type: "integer" - sink: - - stdout: ``` {% include copy.html %} diff --git a/_data-prepper/pipelines/configuration/processors/copy-values.md b/_data-prepper/pipelines/configuration/processors/copy-values.md index 17db03fd..0284423d 100644 --- a/_data-prepper/pipelines/configuration/processors/copy-values.md +++ b/_data-prepper/pipelines/configuration/processors/copy-values.md @@ -1,6 +1,6 @@ --- layout: default -title: Copy values processor +title: copy_values parent: Processors grand_parent: Pipelines nav_order: 48 @@ -28,10 +28,8 @@ To get started, create the following `pipeline.yaml` file: ```yaml pipeline: source: - file: - path: "/full/path/to/logs_json.log" - record_type: "event" - format: "json" + ... + .... processor: - copy_values: entries: @@ -39,7 +37,6 @@ pipeline: to_key: "newMessage" overwrite_if_to_key_exists: true sink: - - stdout: ``` {% include copy.html %} diff --git a/_data-prepper/pipelines/configuration/processors/csv.md b/_data-prepper/pipelines/configuration/processors/csv.md index 6475e5fb..e7ec8a35 100644 --- a/_data-prepper/pipelines/configuration/processors/csv.md +++ b/_data-prepper/pipelines/configuration/processors/csv.md @@ -1,6 +1,6 @@ --- layout: default -title: CSV processor +title: csv parent: Processors grand_parent: Pipelines nav_order: 49 diff --git a/_data-prepper/pipelines/configuration/processors/date.md b/_data-prepper/pipelines/configuration/processors/date.md index 93ddb19d..27b571df 100644 --- a/_data-prepper/pipelines/configuration/processors/date.md +++ b/_data-prepper/pipelines/configuration/processors/date.md @@ -1,6 +1,6 @@ --- layout: default -title: Date +title: date parent: Processors grand_parent: Pipelines nav_order: 50 diff --git a/_data-prepper/pipelines/configuration/processors/delete-entries.md b/_data-prepper/pipelines/configuration/processors/delete-entries.md index abdf80df..0546ed67 100644 --- a/_data-prepper/pipelines/configuration/processors/delete-entries.md +++ b/_data-prepper/pipelines/configuration/processors/delete-entries.md @@ -25,15 +25,12 @@ To get started, create the following `pipeline.yaml` file: ```yaml pipeline: source: - file: - path: "/full/path/to/logs_json.log" - record_type: "event" - format: "json" + ... + .... processor: - delete_entries: with_keys: ["message"] sink: - - stdout: ``` {% include copy.html %} diff --git a/_data-prepper/pipelines/configuration/processors/drop-events.md b/_data-prepper/pipelines/configuration/processors/drop-events.md index 4ba453ee..b0ab6d8b 100644 --- a/_data-prepper/pipelines/configuration/processors/drop-events.md +++ b/_data-prepper/pipelines/configuration/processors/drop-events.md @@ -1,6 +1,6 @@ --- layout: default -title: Drop events processor +title: drop_events parent: Processors grand_parent: Pipelines nav_order: 52 diff --git a/_data-prepper/pipelines/configuration/processors/key-value.md b/_data-prepper/pipelines/configuration/processors/key-value.md index e10e152b..859d1aa4 100644 --- a/_data-prepper/pipelines/configuration/processors/key-value.md +++ b/_data-prepper/pipelines/configuration/processors/key-value.md @@ -1,6 +1,6 @@ --- layout: default -title: Key value processor +title: key_value parent: Processors grand_parent: Pipelines nav_order: 54 diff --git a/_data-prepper/pipelines/configuration/processors/list-to-map.md b/_data-prepper/pipelines/configuration/processors/list-to-map.md index 53f10f2b..e344236f 100644 --- a/_data-prepper/pipelines/configuration/processors/list-to-map.md +++ b/_data-prepper/pipelines/configuration/processors/list-to-map.md @@ -1,7 +1,6 @@ --- layout: default -title: List to map processor -parent: Processors +title: list_to_map grand_parent: Pipelines nav_order: 55 --- diff --git a/_data-prepper/pipelines/configuration/processors/lowercase-string.md b/_data-prepper/pipelines/configuration/processors/lowercase-string.md index e72ab8f0..34ca0597 100644 --- a/_data-prepper/pipelines/configuration/processors/lowercase-string.md +++ b/_data-prepper/pipelines/configuration/processors/lowercase-string.md @@ -1,6 +1,6 @@ --- layout: default -title: Lowercase string processor +title: lowercase_string parent: Processors grand_parent: Pipelines nav_order: 60 diff --git a/_data-prepper/pipelines/configuration/processors/otel-metrics.md b/_data-prepper/pipelines/configuration/processors/otel-metrics.md index d7e51d40..dacabd2f 100644 --- a/_data-prepper/pipelines/configuration/processors/otel-metrics.md +++ b/_data-prepper/pipelines/configuration/processors/otel-metrics.md @@ -1,12 +1,12 @@ --- layout: default -title: otel_metrics processor +title: otel_metrics parent: Processors grand_parent: Pipelines nav_order: 72 --- -# otel_metrics processor +# otel_metrics The `otel_metrics` processor serializes a collection of `ExportMetricsServiceRequest` records sent from the [OTel metrics source]({{site.url}}{{site.baseurl}}//data-prepper/pipelines/configuration/sources/otel-metrics-source/) into a collection of string records. diff --git a/_data-prepper/pipelines/configuration/processors/otel-trace-group.md b/_data-prepper/pipelines/configuration/processors/otel-trace-group.md index ae70c35a..e38e8505 100644 --- a/_data-prepper/pipelines/configuration/processors/otel-trace-group.md +++ b/_data-prepper/pipelines/configuration/processors/otel-trace-group.md @@ -1,12 +1,12 @@ --- layout: default -title: otel_trace_group processor +title: otel_trace_group parent: Processors grand_parent: Pipelines nav_order: 45 --- -# otel_trace_group processor +# otel_trace_group The `otel_trace_group` processor completes missing trace-group-related fields in the collection of [span](https://github.com/opensearch-project/data-prepper/blob/834f28fdf1df6d42a6666e91e6407474b88e7ec6/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/Span.java) records by looking up the OpenSearch backend. The `otel_trace_group` processor identifies the missing trace group information for a `spanId` by looking up the relevant fields in its root `span` stored in OpenSearch. diff --git a/_data-prepper/pipelines/configuration/processors/otel-trace-raw.md b/_data-prepper/pipelines/configuration/processors/otel-trace-raw.md index 35efadca..ed2716c4 100644 --- a/_data-prepper/pipelines/configuration/processors/otel-trace-raw.md +++ b/_data-prepper/pipelines/configuration/processors/otel-trace-raw.md @@ -1,15 +1,14 @@ --- layout: default -title: OTel trace raw processor +title: otel_trace parent: Processors grand_parent: Pipelines nav_order: 75 --- -# otel_trace_raw +# otel_trace - -The `otel_trace_raw` processor completes trace-group-related fields in all incoming Data Prepper span records by state caching the root span information for each `tradeId`. +The `otel_trace` processor completes trace-group-related fields in all incoming Data Prepper span records by state caching the root span information for each `tradeId`. ## Parameters @@ -22,7 +21,7 @@ This processor includes the following parameters. ## Configuration -The following table describes the options you can use to configure the `otel_trace_raw` processor. +The following table describes the options you can use to configure the `otel_trace` processor. Option | Required | Type | Description :--- | :--- | :--- | :--- @@ -39,7 +38,7 @@ The following table describes common [Abstract processor](https://github.com/ope | `recordsOut` | Counter | Metric representing the egress of records from a pipeline component. | | `timeElapsed` | Timer | Metric representing the time elapsed during execution of a pipeline component. | -The `otel_trace_raw` processor includes the following custom metrics. +The `otel_trace` processor includes the following custom metrics: * `traceGroupCacheCount`: The number of trace groups in the trace group cache. * `spanSetCount`: The number of span sets in the span set collection. \ No newline at end of file diff --git a/_data-prepper/pipelines/configuration/processors/parse-json.md b/_data-prepper/pipelines/configuration/processors/parse-json.md index 8711d72f..b2a152db 100644 --- a/_data-prepper/pipelines/configuration/processors/parse-json.md +++ b/_data-prepper/pipelines/configuration/processors/parse-json.md @@ -1,6 +1,6 @@ --- layout: default -title: Parse JSON processor +title: parse_json parent: Processors grand_parent: Pipelines nav_order: 80 @@ -28,11 +28,10 @@ To get started, create the following `pipeline.yaml` file: ```yaml parse-json-pipeline: source: - stdin: + ... + .... processor: - parse_json: - sink: - - stdout: ``` ### Basic example @@ -57,12 +56,11 @@ You can use a JSON pointer to parse a selection of the JSON data by specifying t ```yaml parse-json-pipeline: source: - stdin: + ... + .... processor: - parse_json: pointer: "outer_key/inner_key" - sink: - - stdout: ``` To test the `parse_json` processor with the pointer option, run the pipeline, paste the following line into your console, and then enter `exit` on a new line: diff --git a/_data-prepper/pipelines/configuration/processors/rename-keys.md b/_data-prepper/pipelines/configuration/processors/rename-keys.md index ded4df89..79a82cec 100644 --- a/_data-prepper/pipelines/configuration/processors/rename-keys.md +++ b/_data-prepper/pipelines/configuration/processors/rename-keys.md @@ -1,6 +1,6 @@ --- layout: default -title: Rename keys processor +title: rename_keys parent: Processors grand_parent: Pipelines nav_order: 85 diff --git a/_data-prepper/pipelines/configuration/processors/routes.md b/_data-prepper/pipelines/configuration/processors/routes.md index 06fe79fc..eb451537 100644 --- a/_data-prepper/pipelines/configuration/processors/routes.md +++ b/_data-prepper/pipelines/configuration/processors/routes.md @@ -1,6 +1,6 @@ --- layout: default -title: Routes +title: routes parent: Processors grand_parent: Pipelines nav_order: 90 diff --git a/_data-prepper/pipelines/configuration/processors/service-map-stateful.md b/_data-prepper/pipelines/configuration/processors/service-map-stateful.md index 89f4d2e8..a05f4486 100644 --- a/_data-prepper/pipelines/configuration/processors/service-map-stateful.md +++ b/_data-prepper/pipelines/configuration/processors/service-map-stateful.md @@ -1,18 +1,18 @@ --- layout: default -title: Service map stateful processor +title: service_map parent: Processors grand_parent: Pipelines nav_order: 95 --- -# service_map_stateful +# service_map -The `service_map_stateful` processor uses OpenTelemetry data to create a distributed service map for visualization in OpenSearch Dashboards. +The `service_map` processor uses OpenTelemetry data to create a distributed service map for visualization in OpenSearch Dashboards. ## Configuration -The following table describes the option you can use to configure the `service_map_stateful` processor. +The following table describes the option you can use to configure the `service_map` processor. Option | Required | Type | Description :--- | :--- | :--- | :--- diff --git a/_data-prepper/pipelines/configuration/processors/split-string.md b/_data-prepper/pipelines/configuration/processors/split-string.md index 2139181a..3959ae5a 100644 --- a/_data-prepper/pipelines/configuration/processors/split-string.md +++ b/_data-prepper/pipelines/configuration/processors/split-string.md @@ -1,6 +1,6 @@ --- layout: default -title: Split string processor +title: split_string parent: Processors grand_parent: Pipelines nav_order: 100 diff --git a/_data-prepper/pipelines/configuration/processors/string-converter.md b/_data-prepper/pipelines/configuration/processors/string-converter.md index ae71f956..32055791 100644 --- a/_data-prepper/pipelines/configuration/processors/string-converter.md +++ b/_data-prepper/pipelines/configuration/processors/string-converter.md @@ -1,6 +1,6 @@ --- layout: default -title: String converter processor +title: string_converter parent: Processors grand_parent: Pipelines nav_order: 105 diff --git a/_data-prepper/pipelines/configuration/processors/substitute-string.md b/_data-prepper/pipelines/configuration/processors/substitute-string.md index a48e98be..e02caf29 100644 --- a/_data-prepper/pipelines/configuration/processors/substitute-string.md +++ b/_data-prepper/pipelines/configuration/processors/substitute-string.md @@ -1,6 +1,6 @@ --- layout: default -title: Substitute string processors +title: substitute_string parent: Processors grand_parent: Pipelines nav_order: 110 diff --git a/_data-prepper/pipelines/configuration/processors/trace-peer-forwarder.md b/_data-prepper/pipelines/configuration/processors/trace-peer-forwarder.md index 4214bc34..a73295b8 100644 --- a/_data-prepper/pipelines/configuration/processors/trace-peer-forwarder.md +++ b/_data-prepper/pipelines/configuration/processors/trace-peer-forwarder.md @@ -1,6 +1,6 @@ --- layout: default -title: Trace peer forwarder processors +title: trace_peer_forwarder parent: Processors grand_parent: Pipelines nav_order: 115 @@ -8,13 +8,13 @@ nav_order: 115 # trace peer forwarder -The `trace peer forwarder` processor is used with [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/) to reduce by half the number of events forwarded in a [Trace Analytics]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/trace-analytics/) pipeline. In Trace Analytics, each event is typically duplicated when it is sent from `otel-trace-pipeline` to `raw-pipeline` and `service-map-pipeline`. When pipelines forward events, this causes the core peer forwarder to send multiple HTTP requests for the same event. You can use `trace peer forwarder` to forward an event once through the `otel-trace-pipeline` instead of `raw-pipeline` and `service-map-pipeline`, which prevents unnecessary HTTP requests. +The `trace_peer_forwarder` processor is used with [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/) to reduce by half the number of events forwarded in a [Trace Analytics]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/trace-analytics/) pipeline. In Trace Analytics, each event is typically duplicated when it is sent from `otel-trace-pipeline` to `raw-pipeline` and `service-map-pipeline`. When pipelines forward events, this causes the core peer forwarder to send multiple HTTP requests for the same event. You can use `trace peer forwarder` to forward an event once through the `otel-trace-pipeline` instead of `raw-pipeline` and `service-map-pipeline`, which prevents unnecessary HTTP requests. -You should use `trace peer forwarder` for Trace Analytics pipelines when you have multiple nodes. +You should use `trace_peer_forwarder` for Trace Analytics pipelines when you have multiple nodes. ## Usage -To get started with `trace peer forwarder`, first configure [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/). Then create a `pipeline.yaml` file and specify `trace peer forwarder` as the processor. You can configure `peer forwarder` in your `data-prepper-config.yaml` file. For more detailed information, see [Configuring Data Prepper]({{site.url}}{{site.baseurl}}/data-prepper/getting-started/#2-configuring-data-prepper). +To get started with `trace_peer_forwarder`, first configure [peer forwarder]({{site.url}}{{site.baseurl}}/data-prepper/managing-data-prepper/peer-forwarder/). Then create a `pipeline.yaml` file and specify `trace peer forwarder` as the processor. You can configure `peer forwarder` in your `data-prepper-config.yaml` file. For more detailed information, see [Configuring Data Prepper]({{site.url}}{{site.baseurl}}/data-prepper/getting-started/#2-configuring-data-prepper). See the following example `pipeline.yaml` file: diff --git a/_data-prepper/pipelines/configuration/processors/trim-string.md b/_data-prepper/pipelines/configuration/processors/trim-string.md index bb1defc6..26aba00e 100644 --- a/_data-prepper/pipelines/configuration/processors/trim-string.md +++ b/_data-prepper/pipelines/configuration/processors/trim-string.md @@ -1,6 +1,6 @@ --- layout: default -title: Trim string processors +title: trim_string parent: Processors grand_parent: Pipelines nav_order: 120 diff --git a/_data-prepper/pipelines/configuration/processors/uppercase-string.md b/_data-prepper/pipelines/configuration/processors/uppercase-string.md index 57853ba3..88e6e901 100644 --- a/_data-prepper/pipelines/configuration/processors/uppercase-string.md +++ b/_data-prepper/pipelines/configuration/processors/uppercase-string.md @@ -1,6 +1,6 @@ --- layout: default -title: Uppercase string processor +title: uppercase_string parent: Processors grand_parent: Pipelines nav_order: 125 diff --git a/_data-prepper/pipelines/configuration/sinks/opensearch.md b/_data-prepper/pipelines/configuration/sinks/opensearch.md index 70e08874..f503753e 100644 --- a/_data-prepper/pipelines/configuration/sinks/opensearch.md +++ b/_data-prepper/pipelines/configuration/sinks/opensearch.md @@ -59,7 +59,7 @@ password | No | String | Password for HTTP basic authentication. aws_sigv4 | No | Boolean | Default value is false. Whether to use AWS Identity and Access Management (IAM) signing to connect to an Amazon OpenSearch Service domain. For your access key, secret key, and optional session token, Data Prepper uses the default credential chain (environment variables, Java system properties, `~/.aws/credential`, etc.). aws_region | No | String | The AWS region (for example, `"us-east-1"`) for the domain if you are connecting to Amazon OpenSearch Service. aws_sts_role_arn | No | String | IAM role that the plugin uses to sign requests sent to Amazon OpenSearch Service. If this information is not provided, the plugin uses the default credentials. -max_retries | No | Integer | The maximum number of times the OpenSearch sink should try to push data to the OpenSearch server before considering it as failure. Defaults to `Integer.MAX_VALUE`. If not provided, the sink will try to push data to the OpenSearch server indefinitely because the default value is high and exponential backoff would increase the waiting time before retry. +[max_retries](#configure-max_retries) | No | Integer | The maximum number of times the OpenSearch sink should try to push data to the OpenSearch server before considering it to be a failure. Defaults to `Integer.MAX_VALUE`. If not provided, the sink will try to push data to the OpenSearch server indefinitely because the default value is high and exponential backoff would increase the waiting time before retry. socket_timeout | No | Integer | The timeout, in milliseconds, waiting for data to return (or the maximum period of inactivity between two consecutive data packets). A timeout value of zero is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient would rely on operating system settings for managing socket timeouts. connect_timeout | No | Integer | The timeout in milliseconds used when requesting a connection from the connection manager. A timeout value of zero is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient would rely on operating system settings for managing connection timeouts. insecure | No | Boolean | Whether or not to verify SSL certificates. If set to true, certificate authority (CA) certificate verification is disabled and insecure HTTP requests are sent instead. Default value is `false`. @@ -69,12 +69,20 @@ index_type | No | String | This index type tells the Sink plugin what type of da template_file | No | String | Path to a JSON [index template]({{site.url}}{{site.baseurl}}/opensearch/index-templates/) file (for example, `/your/local/template-file.json`) if `index_type` is `custom`. See [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json) for an example. document_id_field | No | String | The field from the source data to use for the OpenSearch document ID (for example, `"my-field"`) if `index_type` is `custom`. dlq_file | No | String | The path to your preferred dead letter queue file (for example, `/your/local/dlq-file`). Data Prepper writes to this file when it fails to index a document on the OpenSearch cluster. -dlq | No | N/A | DLQ configurations. See [Dead Letter Queues (DLQ)](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is also available, the sink will fail. +dlq | No | N/A | DLQ configurations. See [Dead Letter Queues]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/dlq/) for details. If the `dlq_file` option is also available, the sink will fail. bulk_size | No | Integer (long) | The maximum size (in MiB) of bulk requests sent to the OpenSearch cluster. Values below 0 indicate an unlimited size. If a single document exceeds the maximum bulk request size, Data Prepper sends it individually. Default value is 5. ism_policy_file | No | String | The absolute file path for an ISM (Index State Management) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, `custom` index type is currently the only one without a built-in policy file, thus it would use the policy file here if it's provided through this parameter. For more information, see [ISM policies]({{site.url}}{{site.baseurl}}/im-plugin/ism/policies/). number_of_shards | No | Integer | The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/). number_of_replicas | No | Integer | The number of replica shards each primary shard should have on the destination OpenSearch server. For example, if you have 4 primary shards and set number_of_replicas to 3, the index has 12 replica shards. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. For more information, see [Create index]({{site.url}}{{site.baseurl}}/api-reference/index-apis/create-index/). +### Configure max_retries + +You can include the `max_retries` option in your pipeline configuration to control the number of times the source tries to write to sinks with exponential backoff. If you don't include this option, pipelines keep retrying forever. + +If you specify `max_retries` and a pipeline has a [dead-letter queue (DLQ)]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/dlq/) configured, the pipeline will keep trying to write to sinks until it reaches the maximum number of retries, at which point it starts to send failed data to the DLQ. + +If you don't specify `max_retries`, only data that is rejected by sinks is written to the DLQ. Pipelines continue to try to write all other data to the sinks. + ## OpenSearch cluster security In order to send data to an OpenSearch cluster using the `opensearch` sink plugin, you must specify your username and password within the pipeline configuration. The following example `pipelines.yaml` file demonstrates how to specify admin security credentials: diff --git a/_data-prepper/pipelines/dlq.md b/_data-prepper/pipelines/dlq.md new file mode 100644 index 00000000..3032536e --- /dev/null +++ b/_data-prepper/pipelines/dlq.md @@ -0,0 +1,83 @@ +--- +layout: default +title: Dead-letter queues +parent: Pipelines +nav_order: 13 +--- + +# Dead-letter queues + +Data Prepper pipelines support dead-letter queues (DLQs) for offloading failed events and making them accessible for analysis. + +As of Data Prepper 2.3, only the `s3` source supports DLQs. + +## Configure a DLQ writer + +To configure a DLQ writer for the `s3` source, add the following to your pipeline.yaml file: + +```yaml + sink: + opensearch: + dlq: + s3: + bucket: "my-dlq-bucket" + key_path_prefix: "dlq-files/" + region: "us-west-2" + sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role" +``` + +The resulting DLQ file outputs as a JSON array of DLQ objects. Any file written to the S3 DLQ contains the following name pattern: + +``` +dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId} +``` +The following information is replaced in the name pattern: + + +- `version`: The Data Prepper version. +- `pipelineName`: The pipeline name indicated in pipeline.yaml. +- `pluginId`: The ID of the plugin associated with the DLQ event. + +## Configuration + +DLQ supports the following configuration options. + +Option | Required | Type | Description +:--- | :--- | :--- | :--- +bucket | Yes | String | The name of the bucket into which the DLQ outputs failed records. +key_path_prefix | No | String | The `key_prefix` used in the S3 bucket. Defaults to `""`. Supports time value pattern variables, such as `/%{yyyy}/%{MM}/%{dd}`, including any variables listed in the [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, when using the `/%{yyyy}/%{MM}/%{dd}` pattern, you can set `key_prefix` as `/2023/01/24`. +region | No | String | The AWS Region of the S3 bucket. Defaults to `us-east-1`. +sts_role_arn | No | String | The STS role the DLQ assumes in order to write to an AWS S3 bucket. Default is `null`, which uses the standard SDK behavior for credentials. To use this option, the S3 bucket must have the `S3:PutObject` permission configured. + +When using DLQ with an OpenSearch sink, you can configure the [max_retries]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/sinks/opensearch/#configure-max_retries) option to send failed data to the DLQ when the sink reaches the maximum number of retries. + + +## Metrics + +DLQ supports the following metrics. + +### Counter + +- `dlqS3RecordsSuccess`: Measures the number of successful records sent to S3. +- `dlqS3RecordsFailed`: Measures the number of records that failed to be sent to S3. +- `dlqS3RequestSuccess`: Measures the number of successful S3 requests. +- `dlqS3RequestFailed`: Measures the number of failed S3 requests. + +### Distribution summary + +- `dlqS3RequestSizeBytes`: Measures the distribution of the S3 request's payload size in bytes. + +### Timer + +- `dlqS3RequestLatency`: Measures latency when sending each S3 request, including retries. + +## DLQ objects + +DLQ supports the following DLQ objects: + +* `pluginId`: The ID of the plugin that originated the event sent to the DLQ. +* `pluginName`: The name of the plugin. +* `failedData` : An object that contains the failed object and its options. This object is unique to each plugin. +* `pipelineName`: The name of the Data Prepper pipeline in which the event failed. +* `timestamp`: The timestamp of the failures in an `ISO8601` format. +