287 lines
9.5 KiB
Markdown
287 lines
9.5 KiB
Markdown
---
|
|
layout: default
|
|
title: Pipelines
|
|
parent: Data Prepper
|
|
nav_order: 2
|
|
---
|
|
|
|
# Pipelines
|
|
|
|
![Data Prepper Pipeline]({{site.url}}{{site.baseurl}}/images/data-prepper-pipeline.png)
|
|
|
|
To use Data Prepper, you define pipelines in a configuration YAML file. Each pipeline is a combination of a source, a buffer, zero or more processors, and one or more sinks. For example:
|
|
|
|
```yml
|
|
simple-sample-pipeline:
|
|
workers: 2 # the number of workers
|
|
delay: 5000 # in milliseconds, how long workers wait between read attempts
|
|
source:
|
|
random:
|
|
buffer:
|
|
bounded_blocking:
|
|
buffer_size: 1024 # max number of records the buffer accepts
|
|
batch_size: 256 # max number of records the buffer drains after each read
|
|
processor:
|
|
- string_converter:
|
|
upper_case: true
|
|
sink:
|
|
- stdout:
|
|
```
|
|
|
|
- Sources define where your data comes from. In this case, the source is a random UUID generator (`random`).
|
|
|
|
- Buffers store data as it passes through the pipeline.
|
|
|
|
By default, Data Prepper uses its one and only buffer, the `bounded_blocking` buffer, so you can omit this section unless you developed a custom buffer or need to tune the buffer settings.
|
|
|
|
- Processors perform some action on your data: filter, transform, enrich, etc.
|
|
|
|
You can have multiple processors, which run sequentially from top to bottom, not in parallel. The `string_converter` processor transform the strings by making them uppercase.
|
|
|
|
- Sinks define where your data goes. In this case, the sink is stdout.
|
|
|
|
## Examples
|
|
|
|
This section provides some pipeline examples that you can use to start creating your own pipelines. For more information, see [Data Prepper configuration reference]({{site.url}}{{site.baseurl}}/clients/data-prepper/data-prepper-reference/) guide.
|
|
|
|
The Data Prepper repository has several [sample applications](https://github.com/opensearch-project/data-prepper/tree/main/examples) to help you get started.
|
|
|
|
### Log ingestion pipeline
|
|
|
|
The following example demonstrates how to use HTTP source and Grok prepper plugins to process unstructured log data.
|
|
|
|
```yml
|
|
log-pipeline:
|
|
source:
|
|
http:
|
|
ssl: false
|
|
processor:
|
|
- grok:
|
|
match:
|
|
log: [ "%{COMMONAPACHELOG}" ]
|
|
sink:
|
|
- opensearch:
|
|
hosts: [ "https://opensearch:9200" ]
|
|
insecure: true
|
|
username: admin
|
|
password: admin
|
|
index: apache_logs
|
|
```
|
|
|
|
This example uses weak security. We strongly recommend securing all plugins which open external ports in production environments.
|
|
{: .note}
|
|
|
|
### Trace analytics pipeline
|
|
|
|
The following example demonstrates how to build a pipeline that supports the [Trace Analytics OpenSearch Dashboards plugin]({{site.url}}{{site.baseurl}}/observability-plugin/trace/ta-dashboards/). This pipeline takes data from the OpenTelemetry Collector and uses two other pipelines as sinks. These two separate pipelines index trace and the service map documents for the dashboard plugin.
|
|
|
|
#### Classic
|
|
|
|
This pipeline definition will be deprecated in 2.0. Users are recommended to use [Event record type](#event-record-type) pipeline definition.
|
|
|
|
```yml
|
|
entry-pipeline:
|
|
delay: "100"
|
|
source:
|
|
otel_trace_source:
|
|
ssl: false
|
|
sink:
|
|
- pipeline:
|
|
name: "raw-pipeline"
|
|
- pipeline:
|
|
name: "service-map-pipeline"
|
|
raw-pipeline:
|
|
source:
|
|
pipeline:
|
|
name: "entry-pipeline"
|
|
processor:
|
|
- otel_trace_raw_prepper:
|
|
sink:
|
|
- opensearch:
|
|
hosts: ["https://localhost:9200"]
|
|
insecure: true
|
|
username: admin
|
|
password: admin
|
|
index_type: trace-analytics-raw
|
|
service-map-pipeline:
|
|
delay: "100"
|
|
source:
|
|
pipeline:
|
|
name: "entry-pipeline"
|
|
processor:
|
|
- service_map_stateful:
|
|
sink:
|
|
- opensearch:
|
|
hosts: ["https://localhost:9200"]
|
|
insecure: true
|
|
username: admin
|
|
password: admin
|
|
index_type: trace-analytics-service-map
|
|
```
|
|
|
|
#### Event record type
|
|
|
|
Starting from Data Prepper 1.4, Data Prepper supports event record type in trace analytics pipeline source, buffer, and processors.
|
|
|
|
```yml
|
|
entry-pipeline:
|
|
delay: "100"
|
|
source:
|
|
otel_trace_source:
|
|
ssl: false
|
|
record_type: event
|
|
buffer:
|
|
bounded_blocking:
|
|
buffer_size: 10240
|
|
batch_size: 160
|
|
sink:
|
|
- pipeline:
|
|
name: "raw-pipeline"
|
|
- pipeline:
|
|
name: "service-map-pipeline"
|
|
raw-pipeline:
|
|
source:
|
|
pipeline:
|
|
name: "entry-pipeline"
|
|
buffer:
|
|
bounded_blocking:
|
|
buffer_size: 10240
|
|
batch_size: 160
|
|
processor:
|
|
- otel_trace_raw:
|
|
sink:
|
|
- opensearch:
|
|
hosts: ["https://localhost:9200"]
|
|
insecure: true
|
|
username: admin
|
|
password: admin
|
|
index_type: trace-analytics-raw
|
|
service-map-pipeline:
|
|
delay: "100"
|
|
source:
|
|
pipeline:
|
|
name: "entry-pipeline"
|
|
buffer:
|
|
bounded_blocking:
|
|
buffer_size: 10240
|
|
batch_size: 160
|
|
processor:
|
|
- service_map_stateful:
|
|
sink:
|
|
- opensearch:
|
|
hosts: ["https://localhost:9200"]
|
|
insecure: true
|
|
username: admin
|
|
password: admin
|
|
index_type: trace-analytics-service-map
|
|
```
|
|
|
|
Note that it is recommended to scale the `buffer_size` and `batch_size` by the estimated maximum batch size in the client request payload to maintain similar ingestion throughput and latency as in [Classic](#classic).
|
|
|
|
### Metrics pipeline
|
|
|
|
Data Prepper supports metrics ingestion using OTel. It currently supports the following metric types:
|
|
|
|
* Gauge
|
|
* Sum
|
|
* Summary
|
|
* Histogram
|
|
|
|
Other types are not supported. Data Prepper drops all other types, including Exponential Histogram and Summary. Additionally, Data Prepper does not support Scope instrumentation.
|
|
|
|
To set up a metrics pipeline:
|
|
|
|
```yml
|
|
metrics-pipeline:
|
|
source:
|
|
otel_metrics_source:
|
|
processor:
|
|
- otel_metrics_raw_processor:
|
|
sink:
|
|
- opensearch:
|
|
hosts: ["https://localhost:9200"]
|
|
username: admin
|
|
password: admin
|
|
```
|
|
|
|
### S3 log ingestion pipeline
|
|
|
|
The following example demonstrates how to use the S3 Source and Grok Processor plugins to process unstructured log data
|
|
from [Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3). This example uses Application Load
|
|
Balancer logs. As the Application Load Balancer writes logs to S3, S3 creates notifications in Amazon SQS. Data Prepper
|
|
reads those notifications and reads the S3 objects to get the log data and process it.
|
|
|
|
```
|
|
log-pipeline:
|
|
source:
|
|
s3:
|
|
notification_type: "sqs"
|
|
compression: "gzip"
|
|
codec:
|
|
newline:
|
|
sqs:
|
|
queue_url: "https://sqs.us-east-1.amazonaws.com/12345678910/ApplicationLoadBalancer"
|
|
aws:
|
|
region: "us-east-1"
|
|
sts_role_arn: "arn:aws:iam::12345678910:role/Data-Prepper"
|
|
|
|
processor:
|
|
- grok:
|
|
match:
|
|
message: ["%{DATA:type} %{TIMESTAMP_ISO8601:time} %{DATA:elb} %{DATA:client} %{DATA:target} %{BASE10NUM:request_processing_time} %{DATA:target_processing_time} %{BASE10NUM:response_processing_time} %{BASE10NUM:elb_status_code} %{DATA:target_status_code} %{BASE10NUM:received_bytes} %{BASE10NUM:sent_bytes} \"%{DATA:request}\" \"%{DATA:user_agent}\" %{DATA:ssl_cipher} %{DATA:ssl_protocol} %{DATA:target_group_arn} \"%{DATA:trace_id}\" \"%{DATA:domain_name}\" \"%{DATA:chosen_cert_arn}\" %{DATA:matched_rule_priority} %{TIMESTAMP_ISO8601:request_creation_time} \"%{DATA:actions_executed}\" \"%{DATA:redirect_url}\" \"%{DATA:error_reason}\" \"%{DATA:target_list}\" \"%{DATA:target_status_code_list}\" \"%{DATA:classification}\" \"%{DATA:classification_reason}"]
|
|
- grok:
|
|
match:
|
|
request: ["(%{NOTSPACE:http_method})? (%{NOTSPACE:http_uri})? (%{NOTSPACE:http_version})?"]
|
|
- grok:
|
|
match:
|
|
http_uri: ["(%{WORD:protocol})?(://)?(%{IPORHOST:domain})?(:)?(%{INT:http_port})?(%{GREEDYDATA:request_uri})?"]
|
|
- date:
|
|
from_time_received: true
|
|
destination: "@timestamp"
|
|
|
|
|
|
sink:
|
|
- opensearch:
|
|
hosts: [ "https://localhost:9200" ]
|
|
username: "admin"
|
|
password: "admin"
|
|
index: alb_logs
|
|
```
|
|
|
|
## Migrating from Logstash
|
|
|
|
Data Prepper supports Logstash configuration files for a limited set of plugins. Simply use the logstash config to run Data Prepper.
|
|
|
|
```bash
|
|
docker run --name data-prepper \
|
|
-v /full/path/to/logstash.conf:/usr/share/data-prepper/pipelines.conf \
|
|
opensearchproject/opensearch-data-prepper:latest
|
|
```
|
|
|
|
This feature is limited by feature parity of Data Prepper. As of Data Prepper 1.2 release, the following plugins from the Logstash configuration are supported:
|
|
|
|
- HTTP Input plugin
|
|
- Grok Filter plugin
|
|
- Elasticsearch Output plugin
|
|
- Amazon Elasticsearch Output plugin
|
|
|
|
## Configure the Data Prepper server
|
|
|
|
Data Prepper itself provides administrative HTTP endpoints such as `/list` to list pipelines and `/metrics/prometheus` to provide Prometheus-compatible metrics data. The port that has these endpoints has a TLS configuration and is specified by a separate YAML file. By default, these endpoints are secured by Data Prepper docker images. We strongly recommend providing your own configuration file for securing production environments. Here is an example `data-prepper-config.yaml`:
|
|
|
|
```yml
|
|
ssl: true
|
|
keyStoreFilePath: "/usr/share/data-prepper/keystore.jks"
|
|
keyStorePassword: "password"
|
|
privateKeyPassword: "other_password"
|
|
serverPort: 1234
|
|
```
|
|
|
|
To configure the Data Prepper server, run Data Prepper with the additional yaml file.
|
|
|
|
```bash
|
|
docker run --name data-prepper -v /full/path/to/pipelines.yaml:/usr/share/data-prepper/pipelines.yaml \
|
|
/full/path/to/data-prepper-config.yaml:/usr/share/data-prepper/data-prepper-config.yaml \
|
|
opensearchproject/data-prepper:latest
|
|
````
|