OpenSearch/docs/plugins/ingest.asciidoc

1024 lines
32 KiB
Plaintext

[[ingest]]
== Ingest Plugin
The ingest plugin can be used to pre-process documents before the actual indexing takes place.
This pre-processing happens by the ingest plugin that intercepts bulk and index requests, applies the
transformations and then passes the documents back to the index or bulk APIs.
The ingest plugin is disabled by default. In order to enable the ingest plugin the following
setting should be configured in the elasticsearch.yml file:
[source,yaml]
--------------------------------------------------
node.ingest: true
--------------------------------------------------
The ingest plugin can be installed and enabled on any node. It is possible to run ingest
on an master and or data node or have dedicated client nodes that run with ingest.
In order to pre-process document before indexing the `pipeline` parameter should be used
on an index or bulk request to tell the ingest plugin what pipeline is going to be used.
[source,js]
--------------------------------------------------
PUT /my-index/my-type/my-id?pipeline=my_pipeline_id
{
...
}
--------------------------------------------------
// AUTOSENSE
=== Processors
==== Set processor
Sets one field and associates it with the specified value. If the field already exists,
its value will be replaced with the provided one.
[source,js]
--------------------------------------------------
{
"set": {
"field": "field1",
"value": 582.1
}
}
--------------------------------------------------
==== Append processor
Appends one or more values to an existing array if the field already exists and it is an array.
Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar.
Creates an array containing the provided values if the fields doesn't exist.
Accepts a single value or an array of values.
[source,js]
--------------------------------------------------
{
"append": {
"field": "field1"
"value": ["item2", "item3", "item4"]
}
}
--------------------------------------------------
==== Remove processor
Removes an existing field. If the field doesn't exist, an exception will be thrown
[source,js]
--------------------------------------------------
{
"remove": {
"field": "foo"
}
}
--------------------------------------------------
==== Rename processor
Renames an existing field. If the field doesn't exist, an exception will be thrown. Also, the new field
name must not exist.
[source,js]
--------------------------------------------------
{
"rename": {
"field": "foo",
"to": "foobar"
}
}
--------------------------------------------------
==== Convert processor
Converts an existing field's value to a different type, like turning a string to an integer.
If the field value is an array, all members will be converted.
The supported types include: `integer`, `float`, `string`, and `boolean`.
`boolean` will set the field to true if its string value is equal to `true` (ignore case), to
false if its string value is equal to `false` (ignore case) and it will throw exception otherwise.
[source,js]
--------------------------------------------------
{
"convert": {
"field" : "foo"
"type": "integer"
}
}
--------------------------------------------------
==== Gsub processor
Converts a string field by applying a regular expression and a replacement.
If the field is not a string, the processor will throw an exception.
This configuration takes a `field` for the field name, `pattern` for the
pattern to be replaced, and `replacement` for the string to replace the matching patterns with.
[source,js]
--------------------------------------------------
{
"gsub": {
"field": "field1",
"pattern": "\.",
"replacement": "-"
}
}
--------------------------------------------------
==== Join processor
Joins each element of an array into a single string using a separator character between each element.
Throws error when the field is not an array.
[source,js]
--------------------------------------------------
{
"join": {
"field": "joined_array_field",
"separator": "-"
}
}
--------------------------------------------------
==== Split processor
Split a field to an array using a separator character. Only works on string fields.
[source,js]
--------------------------------------------------
{
"split": {
"field": ","
}
}
--------------------------------------------------
==== Lowercase processor
Converts a string to its lowercase equivalent.
[source,js]
--------------------------------------------------
{
"lowercase": {
"field": "foo"
}
}
--------------------------------------------------
==== Uppercase processor
Converts a string to its uppercase equivalent.
[source,js]
--------------------------------------------------
{
"uppercase": {
"field": "foo"
}
}
--------------------------------------------------
==== Trim processor
Trims whitespace from field. NOTE: this only works on leading and trailing whitespaces.
[source,js]
--------------------------------------------------
{
"trim": {
"field": "foo"
}
}
--------------------------------------------------
==== Grok Processor
The Grok Processor extracts structured fields out of a single text field within a document. You choose which field to
extract matched fields from, as well as the Grok Pattern you expect will match. A Grok Pattern is like a regular
expression that supports aliased expressions that can be reused.
This tool is perfect for syslog logs, apache and other webserver logs, mysql logs, and in general, any log format
that is generally written for humans and not computer consumption.
The processor comes packaged with over 120 reusable patterns that are located at `$ES_HOME/config/ingest/grok/patterns`.
Here, you can add your own custom grok pattern files with custom grok expressions to be used by the processor.
If you need help building patterns to match your logs, you will find the <http://grokdebug.herokuapp.com> and
<http://grokconstructor.appspot.com/> applications quite useful!
===== Grok Basics
Grok sits on top of regular expressions, so any regular expressions are valid in grok as well.
The regular expression library is Oniguruma, and you can see the full supported regexp syntax
https://github.com/kkos/oniguruma/blob/master/doc/RE[on the Onigiruma site].
Grok works by leveraging this regular expression language to allow naming existing patterns and combining them into more
complex patterns that match your fields.
The syntax for re-using a grok pattern comes in three forms: `%{SYNTAX:SEMANTIC}`, `%{SYNTAX}`, `%{SYNTAX:SEMANTIC:TYPE}`.
The `SYNTAX` is the name of the pattern that will match your text. For example, `3.44` will be matched by the `NUMBER`
pattern and `55.3.244.1` will be matched by the `IP` pattern. The syntax is how you match. `NUMBER` and `IP` are both
patterns that are provided within the default patterns set.
The `SEMANTIC` is the identifier you give to the piece of text being matched. For example, `3.44` could be the
duration of an event, so you could call it simply `duration`. Further, a string `55.3.244.1` might identify
the `client` making a request.
The `TYPE` is the type you wish to cast your named field. `int` and `float` are currently the only types supported for coercion.
For example, here is a grok pattern that would match the above example given. We would like to match a text with the following
contents:
[source,js]
--------------------------------------------------
3.44 55.3.244.1
--------------------------------------------------
We may know that the above message is a number followed by an IP-address. We can match this text with the following
Grok expression.
[source,js]
--------------------------------------------------
%{NUMBER:duration} %{IP:client}
--------------------------------------------------
===== Custom Patterns and Pattern Files
The Grok Processor comes pre-packaged with a base set of pattern files. These patterns may not always have
what you are looking for. These pattern files have a very basic format. Each line describes a named pattern with
the following format:
[source,js]
--------------------------------------------------
NAME ' '+ PATTERN '\n'
--------------------------------------------------
You can add this pattern to an existing file, or add your own file in the patterns directory here: `$ES_HOME/config/ingest/grok/patterns`.
The Ingest Plugin will pick up files in this directory to be loaded into the grok processor's known patterns. These patterns are loaded
at startup, so you will need to do a restart your ingest node if you wish to update these files while running.
Example snippet of pattern definitions found in the `grok-patterns` patterns file:
[source,js]
--------------------------------------------------
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
--------------------------------------------------
===== Using Grok Processor in a Pipeline
[[grok-options]]
.Grok Options
[options="header"]
|======
| Name | Required | Default | Description
| `match_field` | yes | - | The field to use for grok expression parsing
| `match_pattern` | yes | - | The grok expression to match and extract named captures with
| `pattern_definitions` | no | - | A map of pattern-name and pattern tuples defining custom patterns to be used by the current processor. Patterns matching existing names will override the pre-existing definition.
|======
Here is an example of using the provided patterns to extract out and name structured fields from a string field in
a document.
[source,js]
--------------------------------------------------
{
"message": "55.3.244.1 GET /index.html 15824 0.043"
}
--------------------------------------------------
The pattern for this could be
[source]
--------------------------------------------------
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
--------------------------------------------------
An example pipeline for processing the above document using Grok:
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors": [
{
"grok": {
"match_field": "message",
"match_pattern": "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
}
}
]
}
--------------------------------------------------
This pipeline will insert these named captures as new fields within the document, like so:
[source,js]
--------------------------------------------------
{
"message": "55.3.244.1 GET /index.html 15824 0.043",
"client": "55.3.244.1",
"method": "GET",
"request": "/index.html",
"bytes": 15824,
"duration": "0.043"
}
--------------------------------------------------
An example of a pipeline specifying custom pattern definitions:
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors": [
{
"grok": {
"match_field": "message",
"match_pattern": "my %{FAVORITE_DOG:dog} is colored %{RGB:color}"
"pattern_definitions" : {
"FAVORITE_DOG" : "beagle",
"RGB" : "RED|GREEN|BLUE"
}
}
}
]
}
--------------------------------------------------
==== Geoip processor
The GeoIP processor adds information about the geographical location of IP addresses, based on data from the Maxmind databases.
This processor adds this information by default under the `geoip` field.
The ingest plugin ships by default with the GeoLite2 City and GeoLite2 Country geoip2 databases from Maxmind made available
under the CCA-ShareAlike 3.0 license. For more details see, http://dev.maxmind.com/geoip/geoip2/geolite2/
The GeoIP processor can run with other geoip2 databases from Maxmind. The files must be copied into the geoip config directory
and the `database_file` option should be used to specify the filename of the custom database. The geoip config directory
is located at `$ES_HOME/config/ingest/geoip` and holds the shipped databases too.
[[geoip-options]]
.Geoip options
[options="header"]
|======
| Name | Required | Default | Description
| `source_field` | yes | - | The field to get the ip address or hostname from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the Maxmind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest plugin ships with the GeoLite2-City.mmdb and GeoLite2-Country.mmdb files.
| `fields` | no | [`continent_name`, `country_iso_code`, `region_name`, `city_name`, `location`] <1> | Controls what properties are added to the `target_field` based on the geoip lookup.
|======
<1> Depends on what is available in `database_field`:
* If the GeoLite2 City database is used then the following fields may be added under the `target_field`: `ip`,
`country_iso_code`, `country_name`, `continent_name`, `region_name`, `city_name`, `timezone`, `latitude`, `longitude`
and `location`. The fields actually added depend on what has been found and which fields were configured in `fields`.
* If the GeoLite2 Country database is used then the following fields may be added under the `target_field`: `ip`,
`country_iso_code`, `country_name` and `continent_name`.The fields actually added depend on what has been found and which fields were configured in `fields`.
An example that uses the default city database and adds the geographical information to the `geoip` field based on the `ip` field:
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors" : [
{
"geoip" : {
"source_field" : "ip"
}
}
]
}
--------------------------------------------------
An example that uses the default country database and add the geographical information to the `geo` field based on the `ip` field`:
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors" : [
{
"geoip" : {
"source_field" : "ip",
"target_field" : "geo",
"database_file" : "GeoLite2-Country.mmdb"
}
}
]
}
--------------------------------------------------
==== Date processor
The date processor is used for parsing dates from fields, and then using that date or timestamp as the timestamp for that document.
The date processor adds by default the parsed date as a new field called `@timestamp`, configurable by setting the `target_field`
configuration parameter. Multiple date formats are supported as part of the same date processor definition. They will be used
sequentially to attempt parsing the date field, in the same order they were defined as part of the processor definition.
[[date-options]]
.Date options
[options="header"]
|======
| Name | Required | Default | Description
| `match_field` | yes | - | The field to get the date from.
| `target_field` | no | @timestamp | The field that will hold the parsed date.
| `match_formats` | yes | - | Array of the expected date formats. Can be a joda pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, TAI64N.
| `timezone` | no | UTC | The timezone to use when parsing the date.
| `locale` | no | ENGLISH | The locale to use when parsing the date, relevant when parsing month names or week days.
|======
An example that adds the parsed date to the `timestamp` field based on the `initial_date` field:
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors" : [
{
"date" : {
"match_field" : "initial_date",
"target_field" : "timestamp",
"match_formats" : ["dd/MM/yyyy hh:mm:ss"],
"timezone" : "Europe/Amsterdam"
}
}
]
}
--------------------------------------------------
==== Fail processor
The Fail Processor is used to raise an exception. This is useful for when
a user expects a pipeline to fail and wishes to relay a specific message
to the requester.
[source,js]
--------------------------------------------------
{
"fail": {
"message": "an error message"
}
}
--------------------------------------------------
==== DeDot Processor
The DeDot Processor is used to remove dots (".") from field names and
replace them with a specific `separator` string.
[source,js]
--------------------------------------------------
{
"dedot": {
"separator": "_"
}
}
--------------------------------------------------
=== Accessing data in pipelines
Processors in pipelines have read and write access to documents that pass through the pipeline.
The fields in the source of a document and its metadata fields are accessible.
Accessing a field in the source is straightforward and one can refer to fields by
their name. For example:
[source,js]
--------------------------------------------------
{
"set": {
"field": "my_field"
"value": 582.1
}
}
--------------------------------------------------
On top of this fields from the source are always accessible via the `_source` prefix:
[source,js]
--------------------------------------------------
{
"set": {
"field": "_source.my_field"
"value": 582.1
}
}
--------------------------------------------------
Metadata fields can also be accessed in the same way as fields from the source. This
is possible because Elasticsearch doesn't allow fields in the source that have the
same name as metadata fields.
The following example sets the id of a document to `1`:
[source,js]
--------------------------------------------------
{
"set": {
"field": "_id"
"value": "1"
}
}
--------------------------------------------------
The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`, `_parent`,
`_timestamp` and `_ttl`.
Beyond metadata fields and source fields, the ingest plugin also adds ingest metadata to documents being processed.
These metadata properties are accessible under the `_ingest` key. Currently the ingest plugin adds the ingest timestamp
under `_ingest.timestamp` key to the ingest metadata, which is the time the ingest plugin received the index or bulk
request to pre-process. But any processor is free to add more ingest related metadata to it. Ingest metadata is transient
and is lost after a document has been processed by the pipeline and thus ingest metadata won't be indexed.
The following example adds a field with the name `received` and the value is the ingest timestamp:
[source,js]
--------------------------------------------------
{
"set": {
"field": "received"
"value": "{{_ingest.timestamp}}"
}
}
--------------------------------------------------
As opposed to Elasticsearch metadata fields, the ingest metadata field name _ingest can be used as a valid field name
in the source of a document. Use _source._ingest to refer to it, otherwise _ingest will be interpreted as ingest
metadata fields by the ingest plugin.
A number of processor settings also support templating. Settings that support templating can have zero or more
template snippets. A template snippet begins with `{{` and ends with `}}`.
Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
In this example a field by the name `field_c` is added and its value is a concatenation of
the values of `field_a` and `field_b`.
[source,js]
--------------------------------------------------
{
"set": {
"field": "field_c"
"value": "{{field_a}} {{field_b}}"
}
}
--------------------------------------------------
The following example changes the index a document is going to be indexed into. The index a document will be redirected
to depends on the field in the source with name `geoip.country_iso_code`.
[source,js]
--------------------------------------------------
{
"set": {
"field": "_index"
"value": "{{geoip.country_iso_code}}"
}
}
--------------------------------------------------
==== Handling Failure in Pipelines
In its simplest case, pipelines describe a list of processors which
are executed sequentially and processing halts at the first exception. This
may not be desirable when failures are expected. For example, not all your logs
may match a certain grok expression and you may wish to index such documents into
a separate index.
To enable this behavior, you can utilize the `on_failure` parameter. `on_failure`
defines a list of processors to be executed immediately following the failed processor.
This parameter can be supplied at the pipeline level, as well as at the processor
level. If a processor has an `on_failure` configuration option provided, whether
it is empty or not, any exceptions that are thrown by it will be caught and the
pipeline will continue executing the proceeding processors defined. Since further processors
are defined within the scope of an `on_failure` statement, failure handling can be nested.
Example: In the following example we define a pipeline that hopes to rename documents with
a field named `foo` to `bar`. If the document does not contain the `foo` field, we
go ahead and attach an error message within the document for later analysis within
Elasticsearch.
[source,js]
--------------------------------------------------
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"to" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
}
}
]
}
}
]
}
--------------------------------------------------
Example: Here we define an `on_failure` block on a whole pipeline to change
the index for which failed documents get sent.
[source,js]
--------------------------------------------------
{
"description" : "my first pipeline with handled exceptions",
"processors" : [ ... ],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "failed-{{ _index }}"
}
}
]
}
--------------------------------------------------
===== Accessing Error Metadata From Processors Handling Exceptions
Sometimes you may want to retrieve the actual error message that was thrown
by a failed processor. To do so you can access metadata fields called
`on_failure_message` and `on_failure_processor`. These fields are only accessible
from within the context of an `on_failure` block. Here is an updated version of
our first example which leverages these fields to provide the error message instead
of manually setting it.
[source,js]
--------------------------------------------------
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"to" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "{{ _ingest.on_failure_message }}"
}
}
]
}
}
]
}
--------------------------------------------------
=== Ingest APIs
==== Put pipeline API
The put pipeline api adds pipelines and updates existing pipelines in the cluster.
[source,js]
--------------------------------------------------
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"processors" : [
{
"simple" : {
// settings
}
},
// other processors
]
}
--------------------------------------------------
// AUTOSENSE
NOTE: The put pipeline api also instructs all ingest nodes to reload their in-memory representation of pipelines, so that
pipeline changes take immediately in effect.
==== Get pipeline API
The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline.
[source,js]
--------------------------------------------------
GET _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE
Example response:
[source,js]
--------------------------------------------------
{
"my-pipeline-id": {
"_source" : {
"description": "describe pipeline",
"processors": [
{
"simple" : {
// settings
}
},
// other processors
]
},
"_version" : 0
}
}
--------------------------------------------------
For each returned pipeline the source and the version is returned.
The version is useful for knowing what version of the pipeline the node has.
Multiple ids can be provided at the same time. Also wildcards are supported.
==== Delete pipeline API
The delete pipeline api deletes pipelines by id.
[source,js]
--------------------------------------------------
DELETE _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE
==== Simulate pipeline API
The simulate pipeline api executes a specific pipeline against
the set of documents provided in the body of the request.
A simulate request may call upon an existing pipeline to be executed
against the provided documents, or supply a pipeline definition in
the body of the request.
Here is the structure of a simulate request with a provided pipeline:
[source,js]
--------------------------------------------------
POST _ingest/pipeline/_simulate
{
"pipeline" : {
// pipeline definition here
},
"docs" : [
{ /** first document **/ },
{ /** second document **/ },
// ...
]
}
--------------------------------------------------
Here is the structure of a simulate request against a pre-existing pipeline:
[source,js]
--------------------------------------------------
POST _ingest/pipeline/my-pipeline-id/_simulate
{
"docs" : [
{ /** first document **/ },
{ /** second document **/ },
// ...
]
}
--------------------------------------------------
Here is an example simulate request with a provided pipeline and its response:
[source,js]
--------------------------------------------------
POST _ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
--------------------------------------------------
// AUTOSENSE
response:
[source,js]
--------------------------------------------------
{
"docs": [
{
"doc": {
"_id": "id",
"_ttl": null,
"_parent": null,
"_index": "index",
"_routing": null,
"_type": "type",
"_timestamp": null,
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2016-01-04T23:53:27.186+0000"
}
}
},
{
"doc": {
"_id": "id",
"_ttl": null,
"_parent": null,
"_index": "index",
"_routing": null,
"_type": "type",
"_timestamp": null,
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2016-01-04T23:53:27.186+0000"
}
}
}
]
}
--------------------------------------------------
It is often useful to see how each processor affects the ingest document
as it is passed through the pipeline. To see the intermediate results of
each processor in the simulat request, a `verbose` parameter may be added
to the request
Here is an example verbose request and its response:
[source,js]
--------------------------------------------------
POST _ingest/pipeline/_simulate?verbose
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value2"
}
},
{
"set" : {
"field" : "field3",
"value" : "_value3"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
--------------------------------------------------
// AUTOSENSE
response:
[source,js]
--------------------------------------------------
{
"docs": [
{
"processor_results": [
{
"processor_id": "processor[set]-0",
"doc": {
"_id": "id",
"_ttl": null,
"_parent": null,
"_index": "index",
"_routing": null,
"_type": "type",
"_timestamp": null,
"_source": {
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2016-01-05T00:02:51.383+0000"
}
}
},
{
"processor_id": "processor[set]-1",
"doc": {
"_id": "id",
"_ttl": null,
"_parent": null,
"_index": "index",
"_routing": null,
"_type": "type",
"_timestamp": null,
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2016-01-05T00:02:51.383+0000"
}
}
}
]
},
{
"processor_results": [
{
"processor_id": "processor[set]-0",
"doc": {
"_id": "id",
"_ttl": null,
"_parent": null,
"_index": "index",
"_routing": null,
"_type": "type",
"_timestamp": null,
"_source": {
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2016-01-05T00:02:51.384+0000"
}
}
},
{
"processor_id": "processor[set]-1",
"doc": {
"_id": "id",
"_ttl": null,
"_parent": null,
"_index": "index",
"_routing": null,
"_type": "type",
"_timestamp": null,
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2016-01-05T00:02:51.384+0000"
}
}
}
]
}
]
}
--------------------------------------------------