[[pipe-line]] == Pipeline Definition A pipeline is a definition of a series of processors that are to be executed in the same sequential order as they are declared. [source,js] -------------------------------------------------- { "description" : "...", "processors" : [ ... ] } -------------------------------------------------- The `description` is a special field to store a helpful description of what the pipeline attempts to achieve. The `processors` parameter defines a list of processors to be executed in order. == Ingest APIs === Put pipeline API The put pipeline api adds pipelines and updates existing pipelines in the cluster. [source,js] -------------------------------------------------- PUT _ingest/pipeline/my-pipeline-id { "description" : "describe pipeline", "processors" : [ { "simple" : { // settings } }, // other processors ] } -------------------------------------------------- // AUTOSENSE NOTE: The put pipeline api also instructs all ingest nodes to reload their in-memory representation of pipelines, so that pipeline changes take immediately in effect. === Get pipeline API The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline. [source,js] -------------------------------------------------- GET _ingest/pipeline/my-pipeline-id -------------------------------------------------- // AUTOSENSE Example response: [source,js] -------------------------------------------------- { "my-pipeline-id": { "_source" : { "description": "describe pipeline", "processors": [ { "simple" : { // settings } }, // other processors ] }, "_version" : 0 } } -------------------------------------------------- For each returned pipeline the source and the version is returned. The version is useful for knowing what version of the pipeline the node has. Multiple ids can be provided at the same time. Also wildcards are supported. === Delete pipeline API The delete pipeline api deletes pipelines by id. [source,js] -------------------------------------------------- DELETE _ingest/pipeline/my-pipeline-id -------------------------------------------------- // AUTOSENSE === Simulate pipeline API The simulate pipeline api executes a specific pipeline against the set of documents provided in the body of the request. A simulate request may call upon an existing pipeline to be executed against the provided documents, or supply a pipeline definition in the body of the request. Here is the structure of a simulate request with a provided pipeline: [source,js] -------------------------------------------------- POST _ingest/pipeline/_simulate { "pipeline" : { // pipeline definition here }, "docs" : [ { /** first document **/ }, { /** second document **/ }, // ... ] } -------------------------------------------------- Here is the structure of a simulate request against a pre-existing pipeline: [source,js] -------------------------------------------------- POST _ingest/pipeline/my-pipeline-id/_simulate { "docs" : [ { /** first document **/ }, { /** second document **/ }, // ... ] } -------------------------------------------------- Here is an example simulate request with a provided pipeline and its response: [source,js] -------------------------------------------------- POST _ingest/pipeline/_simulate { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value" } } ] }, "docs": [ { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "rab" } } ] } -------------------------------------------------- // AUTOSENSE response: [source,js] -------------------------------------------------- { "docs": [ { "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value", "foo": "bar" }, "_ingest": { "timestamp": "2016-01-04T23:53:27.186+0000" } } }, { "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value", "foo": "rab" }, "_ingest": { "timestamp": "2016-01-04T23:53:27.186+0000" } } } ] } -------------------------------------------------- It is often useful to see how each processor affects the ingest document as it is passed through the pipeline. To see the intermediate results of each processor in the simulate request, a `verbose` parameter may be added to the request Here is an example verbose request and its response: [source,js] -------------------------------------------------- POST _ingest/pipeline/_simulate?verbose { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value2" } }, { "set" : { "field" : "field3", "value" : "_value3" } } ] }, "docs": [ { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "rab" } } ] } -------------------------------------------------- // AUTOSENSE response: [source,js] -------------------------------------------------- { "docs": [ { "processor_results": [ { "tag": "processor[set]-0", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value2", "foo": "bar" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.383+0000" } } }, { "tag": "processor[set]-1", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field3": "_value3", "field2": "_value2", "foo": "bar" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.383+0000" } } } ] }, { "processor_results": [ { "tag": "processor[set]-0", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value2", "foo": "rab" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.384+0000" } } }, { "tag": "processor[set]-1", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field3": "_value3", "field2": "_value2", "foo": "rab" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.384+0000" } } } ] } ] } -------------------------------------------------- == Accessing data in pipelines Processors in pipelines have read and write access to documents that pass through the pipeline. The fields in the source of a document and its metadata fields are accessible. Accessing a field in the source is straightforward and one can refer to fields by their name. For example: [source,js] -------------------------------------------------- { "set": { "field": "my_field" "value": 582.1 } } -------------------------------------------------- On top of this fields from the source are always accessible via the `_source` prefix: [source,js] -------------------------------------------------- { "set": { "field": "_source.my_field" "value": 582.1 } } -------------------------------------------------- Metadata fields can also be accessed in the same way as fields from the source. This is possible because Elasticsearch doesn't allow fields in the source that have the same name as metadata fields. The following example sets the id of a document to `1`: [source,js] -------------------------------------------------- { "set": { "field": "_id" "value": "1" } } -------------------------------------------------- The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`, `_parent`, `_timestamp` and `_ttl`. Beyond metadata fields and source fields, ingest also adds ingest metadata to documents being processed. These metadata properties are accessible under the `_ingest` key. Currently ingest adds the ingest timestamp under `_ingest.timestamp` key to the ingest metadata, which is the time ES received the index or bulk request to pre-process. But any processor is free to add more ingest related metadata to it. Ingest metadata is transient and is lost after a document has been processed by the pipeline and thus ingest metadata won't be indexed. The following example adds a field with the name `received` and the value is the ingest timestamp: [source,js] -------------------------------------------------- { "set": { "field": "received" "value": "{{_ingest.timestamp}}" } } -------------------------------------------------- As opposed to Elasticsearch metadata fields, the ingest metadata field name _ingest can be used as a valid field name in the source of a document. Use _source._ingest to refer to it, otherwise _ingest will be interpreted as ingest metadata fields. A number of processor settings also support templating. Settings that support templating can have zero or more template snippets. A template snippet begins with `{{` and ends with `}}`. Accessing fields and metafields in templates is exactly the same as via regular processor field settings. In this example a field by the name `field_c` is added and its value is a concatenation of the values of `field_a` and `field_b`. [source,js] -------------------------------------------------- { "set": { "field": "field_c" "value": "{{field_a}} {{field_b}}" } } -------------------------------------------------- The following example changes the index a document is going to be indexed into. The index a document will be redirected to depends on the field in the source with name `geoip.country_iso_code`. [source,js] -------------------------------------------------- { "set": { "field": "_index" "value": "{{geoip.country_iso_code}}" } } -------------------------------------------------- [[handling-failure-in-pipelines]] === Handling Failure in Pipelines In its simplest case, pipelines describe a list of processors which are executed sequentially and processing halts at the first exception. This may not be desirable when failures are expected. For example, not all your logs may match a certain grok expression and you may wish to index such documents into a separate index. To enable this behavior, you can utilize the `on_failure` parameter. `on_failure` defines a list of processors to be executed immediately following the failed processor. This parameter can be supplied at the pipeline level, as well as at the processor level. If a processor has an `on_failure` configuration option provided, whether it is empty or not, any exceptions that are thrown by it will be caught and the pipeline will continue executing the proceeding processors defined. Since further processors are defined within the scope of an `on_failure` statement, failure handling can be nested. Example: In the following example we define a pipeline that hopes to rename documents with a field named `foo` to `bar`. If the document does not contain the `foo` field, we go ahead and attach an error message within the document for later analysis within Elasticsearch. [source,js] -------------------------------------------------- { "description" : "my first pipeline with handled exceptions", "processors" : [ { "rename" : { "field" : "foo", "to" : "bar", "on_failure" : [ { "set" : { "field" : "error", "value" : "field \"foo\" does not exist, cannot rename to \"bar\"" } } ] } } ] } -------------------------------------------------- Example: Here we define an `on_failure` block on a whole pipeline to change the index for which failed documents get sent. [source,js] -------------------------------------------------- { "description" : "my first pipeline with handled exceptions", "processors" : [ ... ], "on_failure" : [ { "set" : { "field" : "_index", "value" : "failed-{{ _index }}" } } ] } -------------------------------------------------- ==== Accessing Error Metadata From Processors Handling Exceptions Sometimes you may want to retrieve the actual error message that was thrown by a failed processor. To do so you can access metadata fields called `on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag`. These fields are only accessible from within the context of an `on_failure` block. Here is an updated version of our first example which leverages these fields to provide the error message instead of manually setting it. [source,js] -------------------------------------------------- { "description" : "my first pipeline with handled exceptions", "processors" : [ { "rename" : { "field" : "foo", "to" : "bar", "on_failure" : [ { "set" : { "field" : "error", "value" : "{{ _ingest.on_failure_message }}" } } ] } } ] } -------------------------------------------------- == Processors All processors are defined in the following way within a pipeline definition: [source,js] -------------------------------------------------- { "PROCESSOR_NAME" : { ... processor configuration options ... } } -------------------------------------------------- Each processor defines its own configuration parameters, but all processors have the ability to declare `tag` and `on_failure` fields. These fields are optional. A `tag` is simply a string identifier of the specific instantiation of a certain processor in a pipeline. The `tag` field does not affect any processor's behavior, but is very useful for bookkeeping and tracing errors to specific processors. See <> to learn more about the `on_failure` field and error handling in pipelines. === Append processor Appends one or more values to an existing array if the field already exists and it is an array. Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar. Creates an array containing the provided values if the fields doesn't exist. Accepts a single value or an array of values. [[append-options]] .Append Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to be appended to | `value` | yes | - | The value to be appended |====== [source,js] -------------------------------------------------- { "append": { "field": "field1" "value": ["item2", "item3", "item4"] } } -------------------------------------------------- === Convert processor Converts an existing field's value to a different type, like turning a string to an integer. If the field value is an array, all members will be converted. The supported types include: `integer`, `float`, `string`, and `boolean`. `boolean` will set the field to true if its string value is equal to `true` (ignore case), to false if its string value is equal to `false` (ignore case) and it will throw exception otherwise. [[convert-options]] .Convert Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field whose value is to be converted | `type` | yes | - | The type to convert the existing value to |====== [source,js] -------------------------------------------------- { "convert": { "field" : "foo" "type": "integer" } } -------------------------------------------------- === Date processor The date processor is used for parsing dates from fields, and then using that date or timestamp as the timestamp for that document. The date processor adds by default the parsed date as a new field called `@timestamp`, configurable by setting the `target_field` configuration parameter. Multiple date formats are supported as part of the same date processor definition. They will be used sequentially to attempt parsing the date field, in the same order they were defined as part of the processor definition. [[date-options]] .Date options [options="header"] |====== | Name | Required | Default | Description | `match_field` | yes | - | The field to get the date from. | `target_field` | no | @timestamp | The field that will hold the parsed date. | `match_formats` | yes | - | Array of the expected date formats. Can be a joda pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, TAI64N. | `timezone` | no | UTC | The timezone to use when parsing the date. | `locale` | no | ENGLISH | The locale to use when parsing the date, relevant when parsing month names or week days. |====== An example that adds the parsed date to the `timestamp` field based on the `initial_date` field: [source,js] -------------------------------------------------- { "description" : "...", "processors" : [ { "date" : { "match_field" : "initial_date", "target_field" : "timestamp", "match_formats" : ["dd/MM/yyyy hh:mm:ss"], "timezone" : "Europe/Amsterdam" } } ] } -------------------------------------------------- === Fail processor The Fail Processor is used to raise an exception. This is useful for when a user expects a pipeline to fail and wishes to relay a specific message to the requester. [[fail-options]] .Fail Options [options="header"] |====== | Name | Required | Default | Description | `message` | yes | - | The error message of the `FailException` thrown by the processor |====== [source,js] -------------------------------------------------- { "fail": { "message": "an error message" } } -------------------------------------------------- === Foreach processor All processors can operate on elements inside an array, but if all elements of an array need to be processed in the same way defining a processor for each element becomes cumbersome and tricky because it is likely that the number of elements in an array are unknown. For this reason the `foreach` processor is exists. By specifying the field holding array elements and a list of processors that define what should happen to each element, array field can easily be preprocessed. Processors inside the foreach processor work in a different context and the only valid top level field is `_value`, which holds the array element value. Under this field other fields may exist. If the `foreach` processor failed to process an element inside the array and no `on_failure` processor has been specified then it aborts the execution and leaves the array unmodified. [[foreach-options]] .Foreach Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The array field | `processors` | yes | - | The processors |====== Assume the following document: [source,js] -------------------------------------------------- { "value" : ["foo", "bar", "baz"] } -------------------------------------------------- When this `foreach` processor operates on this sample document: [source,js] -------------------------------------------------- { "foreach" : { "field" : "values", "processors" : [ { "uppercase" : { "field" : "_value" } } ] } } -------------------------------------------------- Then the document will look like this after preprocessing: [source,js] -------------------------------------------------- { "value" : ["FOO", "BAR", "BAZ"] } -------------------------------------------------- Lets take a look at another example: [source,js] -------------------------------------------------- { "persons" : [ { "id" : "1", "name" : "John Doe" }, { "id" : "2", "name" : "Jane Doe" } ] } -------------------------------------------------- and in the case the `id` field needs to be removed then the following `foreach` processor can be used: [source,js] -------------------------------------------------- { "foreach" : { "field" : "persons", "processors" : [ { "remove" : { "field" : "_value.id" } } ] } } -------------------------------------------------- After preprocessing the result is: [source,js] -------------------------------------------------- { "persons" : [ { "name" : "John Doe" }, { "name" : "Jane Doe" } ] } -------------------------------------------------- Like on any processor `on_failure` processors can also be defined in processors that wrapped inside the `foreach` processor. For example the `id` field may not exist on all person objects and instead of failing the index request, the document will be send to the 'failure_index' index for later inspection: [source,js] -------------------------------------------------- { "foreach" : { "field" : "persons", "processors" : [ { "remove" : { "field" : "_value.id", "on_failure" : [ { "set" : { "field", "_index", "value", "failure_index" } } ] } } ] } } -------------------------------------------------- In this example if the `remove` processor does fail then the array elements that have been processed thus far will be updated. === Grok Processor The Grok Processor extracts structured fields out of a single text field within a document. You choose which field to extract matched fields from, as well as the Grok Pattern you expect will match. A Grok Pattern is like a regular expression that supports aliased expressions that can be reused. This tool is perfect for syslog logs, apache and other webserver logs, mysql logs, and in general, any log format that is generally written for humans and not computer consumption. The processor comes packaged with over 120 reusable patterns that are located at `$ES_HOME/config/ingest/grok/patterns`. Here, you can add your own custom grok pattern files with custom grok expressions to be used by the processor. If you need help building patterns to match your logs, you will find the and applications quite useful! ==== Grok Basics Grok sits on top of regular expressions, so any regular expressions are valid in grok as well. The regular expression library is Oniguruma, and you can see the full supported regexp syntax https://github.com/kkos/oniguruma/blob/master/doc/RE[on the Onigiruma site]. Grok works by leveraging this regular expression language to allow naming existing patterns and combining them into more complex patterns that match your fields. The syntax for re-using a grok pattern comes in three forms: `%{SYNTAX:SEMANTIC}`, `%{SYNTAX}`, `%{SYNTAX:SEMANTIC:TYPE}`. The `SYNTAX` is the name of the pattern that will match your text. For example, `3.44` will be matched by the `NUMBER` pattern and `55.3.244.1` will be matched by the `IP` pattern. The syntax is how you match. `NUMBER` and `IP` are both patterns that are provided within the default patterns set. The `SEMANTIC` is the identifier you give to the piece of text being matched. For example, `3.44` could be the duration of an event, so you could call it simply `duration`. Further, a string `55.3.244.1` might identify the `client` making a request. The `TYPE` is the type you wish to cast your named field. `int` and `float` are currently the only types supported for coercion. For example, here is a grok pattern that would match the above example given. We would like to match a text with the following contents: [source,js] -------------------------------------------------- 3.44 55.3.244.1 -------------------------------------------------- We may know that the above message is a number followed by an IP-address. We can match this text with the following Grok expression. [source,js] -------------------------------------------------- %{NUMBER:duration} %{IP:client} -------------------------------------------------- ==== Custom Patterns and Pattern Files The Grok Processor comes pre-packaged with a base set of pattern files. These patterns may not always have what you are looking for. These pattern files have a very basic format. Each line describes a named pattern with the following format: [source,js] -------------------------------------------------- NAME ' '+ PATTERN '\n' -------------------------------------------------- You can add this pattern to an existing file, or add your own file in the patterns directory here: `$ES_HOME/config/ingest/grok/patterns`. The Ingest Plugin will pick up files in this directory to be loaded into the grok processor's known patterns. These patterns are loaded at startup, so you will need to do a restart your ingest node if you wish to update these files while running. Example snippet of pattern definitions found in the `grok-patterns` patterns file: [source,js] -------------------------------------------------- YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) -------------------------------------------------- ==== Using Grok Processor in a Pipeline [[grok-options]] .Grok Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to use for grok expression parsing | `pattern` | yes | - | The grok expression to match and extract named captures with | `pattern_definitions` | no | - | A map of pattern-name and pattern tuples defining custom patterns to be used by the current processor. Patterns matching existing names will override the pre-existing definition. |====== Here is an example of using the provided patterns to extract out and name structured fields from a string field in a document. [source,js] -------------------------------------------------- { "message": "55.3.244.1 GET /index.html 15824 0.043" } -------------------------------------------------- The pattern for this could be [source,js] -------------------------------------------------- %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} -------------------------------------------------- An example pipeline for processing the above document using Grok: [source,js] -------------------------------------------------- { "description" : "...", "processors": [ { "grok": { "field": "message", "pattern": "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } ] } -------------------------------------------------- This pipeline will insert these named captures as new fields within the document, like so: [source,js] -------------------------------------------------- { "message": "55.3.244.1 GET /index.html 15824 0.043", "client": "55.3.244.1", "method": "GET", "request": "/index.html", "bytes": 15824, "duration": "0.043" } -------------------------------------------------- An example of a pipeline specifying custom pattern definitions: [source,js] -------------------------------------------------- { "description" : "...", "processors": [ { "grok": { "field": "message", "pattern": "my %{FAVORITE_DOG:dog} is colored %{RGB:color}" "pattern_definitions" : { "FAVORITE_DOG" : "beagle", "RGB" : "RED|GREEN|BLUE" } } } ] } -------------------------------------------------- === Gsub processor Converts a string field by applying a regular expression and a replacement. If the field is not a string, the processor will throw an exception. [[gsub-options]] .Gsub Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field apply the replacement for | `pattern` | yes | - | The pattern to be replaced | `replacement` | yes | - | The string to replace the matching patterns with. |====== [source,js] -------------------------------------------------- { "gsub": { "field": "field1", "pattern": "\.", "replacement": "-" } } -------------------------------------------------- === Join processor Joins each element of an array into a single string using a separator character between each element. Throws error when the field is not an array. [[join-options]] .Join Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to be separated | `separator` | yes | - | The separator character |====== [source,js] -------------------------------------------------- { "join": { "field": "joined_array_field", "separator": "-" } } -------------------------------------------------- === Lowercase processor Converts a string to its lowercase equivalent. [[lowercase-options]] .Lowercase Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to lowercase |====== [source,js] -------------------------------------------------- { "lowercase": { "field": "foo" } } -------------------------------------------------- === Remove processor Removes an existing field. If the field doesn't exist, an exception will be thrown [[remove-options]] .Remove Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to be removed |====== [source,js] -------------------------------------------------- { "remove": { "field": "foo" } } -------------------------------------------------- === Rename processor Renames an existing field. If the field doesn't exist, an exception will be thrown. Also, the new field name must not exist. [[rename-options]] .Rename Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to be renamed | `to` | yes | - | The new name of the field |====== [source,js] -------------------------------------------------- { "rename": { "field": "foo", "to": "foobar" } } -------------------------------------------------- === Set processor Sets one field and associates it with the specified value. If the field already exists, its value will be replaced with the provided one. [[set-options]] .Set Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to insert, upsert, or update | `value` | yes | - | The value to be set for the field |====== [source,js] -------------------------------------------------- { "set": { "field": "field1", "value": 582.1 } } -------------------------------------------------- === Split processor Split a field to an array using a separator character. Only works on string fields. [[split-options]] .Split Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to split |====== [source,js] -------------------------------------------------- { "split": { "field": "," } } -------------------------------------------------- === Trim processor Trims whitespace from field. NOTE: this only works on leading and trailing whitespaces. [[trim-options]] .Trim Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The string-valued field to trim whitespace from |====== [source,js] -------------------------------------------------- { "trim": { "field": "foo" } } -------------------------------------------------- === Uppercase processor Converts a string to its uppercase equivalent. [[uppercase-options]] .Uppercase Options [options="header"] |====== | Name | Required | Default | Description | `field` | yes | - | The field to uppercase |====== [source,js] -------------------------------------------------- { "uppercase": { "field": "foo" } } --------------------------------------------------