[[ingest]] == Ingest Plugin The ingest plugin can be used to pre-process documents before the actual indexing takes place. This pre-processing happens by the ingest plugin that intercepts bulk and index requests, applies the transformations and then passes the documents back to the index or bulk APIs. The ingest plugin is disabled by default. In order to enable the ingest plugin the following setting should be configured in the elasticsearch.yml file: [source,yaml] -------------------------------------------------- node.ingest: true -------------------------------------------------- The ingest plugin can be installed and enabled on any node. It is possible to run ingest on an master and or data node or have dedicated client nodes that run with ingest. In order to pre-process document before indexing the `pipeline` parameter should be used on an index or bulk request to tell the ingest plugin what pipeline is going to be used. [source,js] -------------------------------------------------- PUT /my-index/my-type/my-id?ingest=my_pipeline_id { ... } -------------------------------------------------- // AUTOSENSE === Processors ==== Set processor Sets one field and associates it with the specified value. If the field already exists, its value will be replaced with the provided one. [source,js] -------------------------------------------------- { "set": { "field": "field1", "value": 582.1 } } -------------------------------------------------- ==== Append processor Appends one or more values to an existing array if the field already exists and it is an array. Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar. Creates an array containing the provided values if the fields doesn't exist. Accepts a single value or an array of values. [source,js] -------------------------------------------------- { "append": { "field": "field1" "value": ["item2", "item3", "item4"] } } -------------------------------------------------- ==== Remove processor Removes an existing field. If the field doesn't exist, an exception will be thrown [source,js] -------------------------------------------------- { "remove": { "field": "foo" } } -------------------------------------------------- ==== Rename processor Renames an existing field. If the field doesn't exist, an exception will be thrown. Also, the new field name must not exist. [source,js] -------------------------------------------------- { "rename": { "field": "foo" } } -------------------------------------------------- ==== Convert processor Converts an existing field's value to a different type, like turning a string to an integer. If the field value is an array, all members will be converted. The supported types include: `integer`, `float`, `string`, and `boolean`. `boolean` will set the field to true if its string value is equal to `true` (ignore case), to false if its string value is equal to `false` (ignore case) and it will throw exception otherwise. [source,js] -------------------------------------------------- { "convert": { "field" : "foo" "type": "integer" } } -------------------------------------------------- ==== Gsub processor Converts a string field by applying a regular expression and a replacement. If the field is not a string, the processor will throw an exception. This configuration takes a `field` for the field name, `pattern` for the pattern to be replaced, and `replacement` for the string to replace the matching patterns with. [source,js] -------------------------------------------------- { "gsub": { "field": "field1", "pattern": "\.", "replacement": "-" } } -------------------------------------------------- ==== Join processor Joins each element of an array into a single string using a separator character between each element. Throws error when the field is not an array. [source,js] -------------------------------------------------- { "join": { "field": "joined_array_field", "separator": "-" } } -------------------------------------------------- ==== Split processor Split a field to an array using a separator character. Only works on string fields. [source,js] -------------------------------------------------- { "split": { "field": "," } } -------------------------------------------------- ==== Lowercase processor Converts a string to its lowercase equivalent. [source,js] -------------------------------------------------- { "lowercase": { "field": "foo" } } -------------------------------------------------- ==== Uppercase processor Converts a string to its uppercase equivalent. [source,js] -------------------------------------------------- { "uppercase": { "field": "foo" } } -------------------------------------------------- ==== Trim processor Trims whitespace from field. NOTE: this only works on leading and trailing whitespaces. [source,js] -------------------------------------------------- { "trim": { "field": "foo" } } -------------------------------------------------- ==== Grok Processor The Grok Processor extracts structured fields out of a single text field within a document. You choose which field to extract matched fields from, as well as the Grok Pattern you expect will match. A Grok Pattern is like a regular expression that supports aliased expressions that can be reused. This tool is perfect for syslog logs, apache and other webserver logs, mysql logs, and in general, any log format that is generally written for humans and not computer consumption. The processor comes packaged with over 120 reusable patterns that are located at `$ES_HOME/config/ingest/grok/patterns`. Here, you can add your own custom grok pattern files with custom grok expressions to be used by the processor. If you need help building patterns to match your logs, you will find the and applications quite useful! ===== Grok Basics Grok sits on top of regular expressions, so any regular expressions are valid in grok as well. The regular expression library is Oniguruma, and you can see the full supported regexp syntax https://github.com/kkos/oniguruma/blob/master/doc/RE[on the Onigiruma site]. Grok works by leveraging this regular expression language to allow naming existing patterns and combining them into more complex patterns that match your fields. The syntax for re-using a grok pattern comes in three forms: `%{SYNTAX:SEMANTIC}`, `%{SYNTAX}`, `%{SYNTAX:SEMANTIC:TYPE}`. The `SYNTAX` is the name of the pattern that will match your text. For example, `3.44` will be matched by the `NUMBER` pattern and `55.3.244.1` will be matched by the `IP` pattern. The syntax is how you match. `NUMBER` and `IP` are both patterns that are provided within the default patterns set. The `SEMANTIC` is the identifier you give to the piece of text being matched. For example, `3.44` could be the duration of an event, so you could call it simply `duration`. Further, a string `55.3.244.1` might identify the `client` making a request. The `TYPE` is the type you wish to cast your named field. `int` and `float` are currently the only types supported for coercion. For example, here is a grok pattern that would match the above example given. We would like to match a text with the following contents: [source,js] -------------------------------------------------- 3.44 55.3.244.1 -------------------------------------------------- We may know that the above message is a number followed by an IP-address. We can match this text with the following Grok expression. [source,js] -------------------------------------------------- %{NUMBER:duration} %{IP:client} -------------------------------------------------- ===== Custom Patterns and Pattern Files The Grok Processor comes pre-packaged with a base set of pattern files. These patterns may not always have what you are looking for. These pattern files have a very basic format. Each line describes a named pattern with the following format: [source,js] -------------------------------------------------- NAME ' '+ PATTERN '\n' -------------------------------------------------- You can add this pattern to an existing file, or add your own file in the patterns directory here: `$ES_HOME/config/ingest/grok/patterns`. The Ingest Plugin will pick up files in this directory to be loaded into the grok processor's known patterns. These patterns are loaded at startup, so you will need to do a restart your ingest node if you wish to update these files while running. Example snippet of pattern definitions found in the `grok-patterns` patterns file: [source,js] -------------------------------------------------- YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) -------------------------------------------------- ===== Using Grok Processor in a Pipeline [[grok-options]] .Grok Options [options="header"] |====== | Name | Required | Default | Description | `match_field` | yes | - | The field to use for grok expression parsing | `match_pattern` | yes | - | The grok expression to match and extract named captures with | `pattern_definitions` | no | - | A map of pattern-name and pattern tuples defining custom patterns to be used by the current processor. Patterns matching existing names will override the pre-existing definition. |====== Here is an example of using the provided patterns to extract out and name structured fields from a string field in a document. [source,js] -------------------------------------------------- { "message": "55.3.244.1 GET /index.html 15824 0.043" } -------------------------------------------------- The pattern for this could be [source] -------------------------------------------------- %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} -------------------------------------------------- An example pipeline for processing the above document using Grok: [source,js] -------------------------------------------------- { "description" : "...", "processors": [ { "grok": { "match_field": "message", "match_pattern": "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } ] } -------------------------------------------------- This pipeline will insert these named captures as new fields within the document, like so: [source,js] -------------------------------------------------- { "message": "55.3.244.1 GET /index.html 15824 0.043", "client": "55.3.244.1", "method": "GET", "request": "/index.html", "bytes": 15824, "duration": "0.043" } -------------------------------------------------- An example of a pipeline specifying custom pattern definitions: [source,js] -------------------------------------------------- { "description" : "...", "processors": [ { "grok": { "match_field": "message", "match_pattern": "my %{FAVORITE_DOG:dog} is colored %{RGB:color}" "pattern_definitions" : { "FAVORITE_DOG" : "beagle", "RGB" : "RED|GREEN|BLUE" } } } ] } -------------------------------------------------- ==== Geoip processor The GeoIP processor adds information about the geographical location of IP addresses, based on data from the Maxmind databases. This processor adds this information by default under the `geoip` field. The ingest plugin ships by default with the GeoLite2 City and GeoLite2 Country geoip2 databases from Maxmind made available under the CCA-ShareAlike 3.0 license. For more details see, http://dev.maxmind.com/geoip/geoip2/geolite2/ The GeoIP processor can run with other geoip2 databases from Maxmind. The files must be copied into the geoip config directory and the `database_file` option should be used to specify the filename of the custom database. The geoip config directory is located at `$ES_HOME/config/ingest/geoip` and holds the shipped databases too. [[geoip-options]] .Geoip options [options="header"] |====== | Name | Required | Default | Description | `source_field` | yes | - | The field to get the ip address or hostname from for the geographical lookup. | `target_field` | no | geoip | The field that will hold the geographical information looked up from the Maxmind database. | `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest plugin ships with the GeoLite2-City.mmdb and GeoLite2-Country.mmdb files. | `fields` | no | [`continent_name`, `country_iso_code`, `region_name`, `city_name`, `location`] <1> | Controls what properties are added to the `target_field` based on the geoip lookup. |====== <1> Depends on what is available in `database_field`: * If the GeoLite2 City database is used then the following fields may be added under the `target_field`: `ip`, `country_iso_code`, `country_name`, `continent_name`, `region_name`, `city_name`, `timezone`, `latitude`, `longitude` and `location`. The fields actually added depend on what has been found and which fields were configured in `fields`. * If the GeoLite2 Country database is used then the following fields may be added under the `target_field`: `ip`, `country_iso_code`, `country_name` and `continent_name`.The fields actually added depend on what has been found and which fields were configured in `fields`. An example that uses the default city database and adds the geographical information to the `geoip` field based on the `ip` field: [source,js] -------------------------------------------------- { "description" : "...", "processors" : [ { "geoip" : { "source_field" : "ip" } } ] } -------------------------------------------------- An example that uses the default country database and add the geographical information to the `geo` field based on the `ip` field`: [source,js] -------------------------------------------------- { "description" : "...", "processors" : [ { "geoip" : { "source_field" : "ip", "target_field" : "geo", "database_file" : "GeoLite2-Country.mmdb" } } ] } -------------------------------------------------- ==== Date processor The date processor is used for parsing dates from fields, and then using that date or timestamp as the timestamp for that document. The date processor adds by default the parsed date as a new field called `@timestamp`, configurable by setting the `target_field` configuration parameter. Multiple date formats are supported as part of the same date processor definition. They will be used sequentially to attempt parsing the date field, in the same order they were defined as part of the processor definition. [[date-options]] .Date options [options="header"] |====== | Name | Required | Default | Description | `match_field` | yes | - | The field to get the date from. | `target_field` | no | @timestamp | The field that will hold the parsed date. | `match_formats` | yes | - | Array of the expected date formats. Can be a joda pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, TAI64N. | `timezone` | no | UTC | The timezone to use when parsing the date. | `locale` | no | ENGLISH | The locale to use when parsing the date, relevant when parsing month names or week days. |====== An example that adds the parsed date to the `timestamp` field based on the `initial_date` field: [source,js] -------------------------------------------------- { "description" : "...", "processors" : [ { "date" : { "match_field" : "initial_date", "target_field" : "timestamp", "match_formats" : ["dd/MM/yyyy hh:mm:ss"], "timezone" : "Europe/Amsterdam" } } ] } -------------------------------------------------- ==== Fail processor The Fail Processor is used to raise an exception. This is useful for when a user expects a pipeline to fail and wishes to relay a specific message to the requester. [source,js] -------------------------------------------------- { "fail": { "message": "an error message" } } -------------------------------------------------- === Accessing data in pipelines Processors in pipelines have read and write access to documents that pass through the pipeline. The fields in the source of a document and its metadata fields are accessible. Accessing a field in the source is straightforward and one can refer to fields by their name. For example: [source,js] -------------------------------------------------- { "set": { "field": "my_field" "value": 582.1 } } -------------------------------------------------- On top of this fields from the source are always accessible via the `_source` prefix: [source,js] -------------------------------------------------- { "set": { "field": "_source.my_field" "value": 582.1 } } -------------------------------------------------- Metadata fields can also be accessed in the same way as fields from the source. This is possible because Elasticsearch doesn't allow fields in the source that have the same name as metadata fields. The following example sets the id of a document to `1`: [source,js] -------------------------------------------------- { "set": { "field": "_id" "value": "1" } } -------------------------------------------------- The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`, `_parent`, `_timestamp` and `_ttl`. Beyond metadata fields and source fields, the ingest plugin also adds ingest metadata to documents being processed. These metadata properties are accessible under the `_ingest` key. Currently the ingest plugin adds the ingest timestamp under `_ingest.timestamp` key to the ingest metadata, which is the time the ingest plugin received the index or bulk request to pre-process. But any processor is free to add more ingest related metadata to it. Ingest metadata is transient and is lost after a document has been processed by the pipeline and thus ingest metadata won't be indexed. The following example adds a field with the name `received` and the value is the ingest timestamp: [source,js] -------------------------------------------------- { "set": { "field": "received" "value": "{{_ingest.timestamp}}" } } -------------------------------------------------- As opposed to Elasticsearch metadata fields, the ingest metadata field name _ingest can be used as a valid field name in the source of a document. Use _source._ingest to refer to it, otherwise _ingest will be interpreted as ingest metadata fields by the ingest plugin. A number of processor settings also support templating. Settings that support templating can have zero or more template snippets. A template snippet begins with `{{` and ends with `}}`. Accessing fields and metafields in templates is exactly the same as via regular processor field settings. In this example a field by the name `field_c` is added and its value is a concatenation of the values of `field_a` and `field_b`. [source,js] -------------------------------------------------- { "set": { "field": "field_c" "value": "{{field_a}} {{field_b}}" } } -------------------------------------------------- The following example changes the index a document is going to be indexed into. The index a document will be redirected to depends on the field in the source with name `geoip.country_iso_code`. [source,js] -------------------------------------------------- { "set": { "field": "_index" "value": "{{geoip.country_iso_code}}" } } -------------------------------------------------- === Ingest APIs ==== Put pipeline API The put pipeline api adds pipelines and updates existing pipelines in the cluster. [source,js] -------------------------------------------------- PUT _ingest/pipeline/my-pipeline-id { "description" : "describe pipeline", "processors" : [ { "simple" : { // settings } }, // other processors ] } -------------------------------------------------- // AUTOSENSE NOTE: The put pipeline api also instructs all ingest nodes to reload their in-memory representation of pipelines, so that pipeline changes take immediately in effect. ==== Get pipeline API The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline. [source,js] -------------------------------------------------- GET _ingest/pipeline/my-pipeline-id -------------------------------------------------- // AUTOSENSE Example response: [source,js] -------------------------------------------------- { "my-pipeline-id": { "_source" : { "description": "describe pipeline", "processors": [ { "simple" : { // settings } }, // other processors ] }, "_version" : 0 } } -------------------------------------------------- For each returned pipeline the source and the version is returned. The version is useful for knowing what version of the pipeline the node has. Multiple ids can be provided at the same time. Also wildcards are supported. ==== Delete pipeline API The delete pipeline api deletes pipelines by id. [source,js] -------------------------------------------------- DELETE _ingest/pipeline/my-pipeline-id -------------------------------------------------- // AUTOSENSE ==== Simulate pipeline API The simulate pipeline api executes a specific pipeline against the set of documents provided in the body of the request. A simulate request may call upon an existing pipeline to be executed against the provided documents, or supply a pipeline definition in the body of the request. Here is the structure of a simulate request with a provided pipeline: [source,js] -------------------------------------------------- POST _ingest/pipeline/_simulate { "pipeline" : { // pipeline definition here }, "docs" : [ { /** first document **/ }, { /** second document **/ }, // ... ] } -------------------------------------------------- Here is the structure of a simulate request against a pre-existing pipeline: [source,js] -------------------------------------------------- POST _ingest/pipeline/my-pipeline-id/_simulate { "docs" : [ { /** first document **/ }, { /** second document **/ }, // ... ] } -------------------------------------------------- Here is an example simulate request with a provided pipeline and its response: [source,js] -------------------------------------------------- POST _ingest/pipeline/_simulate { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value" } } ] }, "docs": [ { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "rab" } } ] } -------------------------------------------------- // AUTOSENSE response: [source,js] -------------------------------------------------- { "docs": [ { "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value", "foo": "bar" }, "_ingest": { "timestamp": "2016-01-04T23:53:27.186+0000" } } }, { "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value", "foo": "rab" }, "_ingest": { "timestamp": "2016-01-04T23:53:27.186+0000" } } } ] } -------------------------------------------------- It is often useful to see how each processor affects the ingest document as it is passed through the pipeline. To see the intermediate results of each processor in the simulat request, a `verbose` parameter may be added to the request Here is an example verbose request and its response: [source,js] -------------------------------------------------- POST _ingest/pipeline/_simulate?verbose { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value2" } }, { "set" : { "field" : "field3", "value" : "_value3" } } ] }, "docs": [ { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_type": "type", "_id": "id", "_source": { "foo": "rab" } } ] } -------------------------------------------------- // AUTOSENSE response: [source,js] -------------------------------------------------- { "docs": [ { "processor_results": [ { "processor_id": "processor[set]-0", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value2", "foo": "bar" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.383+0000" } } }, { "processor_id": "processor[set]-1", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field3": "_value3", "field2": "_value2", "foo": "bar" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.383+0000" } } } ] }, { "processor_results": [ { "processor_id": "processor[set]-0", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field2": "_value2", "foo": "rab" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.384+0000" } } }, { "processor_id": "processor[set]-1", "doc": { "_id": "id", "_ttl": null, "_parent": null, "_index": "index", "_routing": null, "_type": "type", "_timestamp": null, "_source": { "field3": "_value3", "field2": "_value2", "foo": "rab" }, "_ingest": { "timestamp": "2016-01-05T00:02:51.384+0000" } } } ] } ] } --------------------------------------------------