1240 lines
35 KiB
Plaintext
1240 lines
35 KiB
Plaintext
[[pipeline]]
|
|
== Pipeline Definition
|
|
|
|
A pipeline is a definition of a series of <<ingest-processors, processors>> that are to be executed
|
|
in the same order as they are declared. A pipeline consists of two main fields: a `description`
|
|
and a list of `processors`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "...",
|
|
"processors" : [ ... ]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The `description` is a special field to store a helpful description of
|
|
what the pipeline does.
|
|
|
|
The `processors` parameter defines a list of processors to be executed in
|
|
order.
|
|
|
|
[[accessing-data-in-pipelines]]
|
|
== Accessing Data in Pipelines
|
|
|
|
The processors in a pipeline have read and write access to documents that pass through the pipeline.
|
|
The processors can access fields in the source of a document and the document's metadata fields.
|
|
|
|
[float]
|
|
[[accessing-source-fields]]
|
|
=== Accessing Fields in the Source
|
|
Accessing a field in the source is straightforward. You simply refer to fields by
|
|
their name. For example:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "my_field",
|
|
"value": 582.1
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
On top of this, fields from the source are always accessible via the `_source` prefix:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "_source.my_field",
|
|
"value": 582.1
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[float]
|
|
[[accessing-metadata-fields]]
|
|
=== Accessing Metadata Fields
|
|
You can access metadata fields in the same way that you access fields in the source. This
|
|
is possible because Elasticsearch doesn't allow fields in the source that have the
|
|
same name as metadata fields.
|
|
|
|
The following example sets the `_id` metadata field of a document to `1`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "_id",
|
|
"value": "1"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`.
|
|
|
|
[float]
|
|
[[accessing-ingest-metadata]]
|
|
=== Accessing Ingest Metadata Fields
|
|
Beyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes.
|
|
These metadata properties are accessible under the `_ingest` key. Currently ingest adds the ingest timestamp
|
|
under the `_ingest.timestamp` key of the ingest metadata. The ingest timestamp is the time when Elasticsearch
|
|
received the index or bulk request to pre-process the document.
|
|
|
|
Any processor can add ingest-related metadata during document processing. Ingest metadata is transient
|
|
and is lost after a document has been processed by the pipeline. Therefore, ingest metadata won't be indexed.
|
|
|
|
The following example adds a field with the name `received`. The value is the ingest timestamp:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "received",
|
|
"value": "{{_ingest.timestamp}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Unlike Elasticsearch metadata fields, the ingest metadata field name `_ingest` can be used as a valid field name
|
|
in the source of a document. Use `_source._ingest` to refer to the field in the source document. Otherwise, `_ingest`
|
|
will be interpreted as an ingest metadata field.
|
|
|
|
[float]
|
|
[[accessing-template-fields]]
|
|
=== Accessing Fields and Metafields in Templates
|
|
A number of processor settings also support templating. Settings that support templating can have zero or more
|
|
template snippets. A template snippet begins with `{{` and ends with `}}`.
|
|
Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
|
|
|
|
The following example adds a field named `field_c`. Its value is a concatenation of
|
|
the values of `field_a` and `field_b`.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "field_c",
|
|
"value": "{{field_a}} {{field_b}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The following example uses the value of the `geoip.country_iso_code` field in the source
|
|
to set the index that the document will be indexed into:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "_index",
|
|
"value": "{{geoip.country_iso_code}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Dynamic field names are also supported. This example sets the field named after the
|
|
value of `service` to the value of the field `code`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "{{service}}",
|
|
"value": "{{code}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[[ingest-conditionals]]
|
|
== Conditional Execution in Pipelines
|
|
|
|
Each processor allows for an optional `if` condition to determine if that
|
|
processor should be executed or skipped. The value of the `if` is a
|
|
<<modules-scripting-painless, Painless>> script that needs to evaluate
|
|
to `true` or `false`.
|
|
|
|
For example the following processor will <<drop-processor,drop>> the document
|
|
(i.e. not index it) if the input document has a field named `network_name`
|
|
and it is equal to `Guest`.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/drop_guests_network
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network_name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
Using that pipeline for an index request:
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=drop_guests_network
|
|
{
|
|
"network_name" : "Guest"
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
Results in nothing indexed since the conditional evaluated to `true`.
|
|
|
|
[source,console-result]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "1",
|
|
"_version": -3,
|
|
"result": "noop",
|
|
"_shards": {
|
|
"total": 0,
|
|
"successful": 0,
|
|
"failed": 0
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
|
|
|
|
[[ingest-conditional-nullcheck]]
|
|
=== Handling Nested Fields in Conditionals
|
|
|
|
Source documents often contain nested fields. Care should be taken
|
|
to avoid NullPointerExceptions if the parent object does not exist
|
|
in the document. For example `ctx.a.b.c` can throw an NullPointerExceptions
|
|
if the source document does not have top level `a` object, or a second
|
|
level `b` object.
|
|
|
|
To help protect against NullPointerExceptions, null safe operations should be used.
|
|
Fortunately, Painless makes {painless}/painless-operators-reference.html#null-safe-operator[null safe]
|
|
operations easy with the `?.` operator.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/drop_guests_network
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network?.name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
The following document will get <<drop-processor,dropped>> correctly:
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=drop_guests_network
|
|
{
|
|
"network": {
|
|
"name": "Guest"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
Thanks to the `?.` operator the following document will not throw an error.
|
|
If the pipeline used a `.` the following document would throw a NullPointerException
|
|
since the `network` object is not part of the source document.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/2?pipeline=drop_guests_network
|
|
{
|
|
"foo" : "bar"
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,console]
|
|
--------------------------------------------------
|
|
GET test/_doc/2
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
[source,console-result]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "2",
|
|
"_version": 1,
|
|
"_seq_no": 22,
|
|
"_primary_term": 1,
|
|
"found": true,
|
|
"_source": {
|
|
"foo": "bar"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term": 1/"_primary_term" : $body._primary_term/]
|
|
////
|
|
|
|
The source document can also use dot delimited fields to represent nested fields.
|
|
|
|
For example instead the source document defining the fields nested:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"network": {
|
|
"name": "Guest"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The source document may have the nested fields flattened as such:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"network.name": "Guest"
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
If this is the case, use the <<dot-expand-processor, Dot Expand Processor>>
|
|
so that the nested fields may be used in a conditional.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/drop_guests_network
|
|
{
|
|
"processors": [
|
|
{
|
|
"dot_expander": {
|
|
"field": "network.name"
|
|
}
|
|
},
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network?.name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
Now the following input document can be used with a conditional in the pipeline.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/3?pipeline=drop_guests_network
|
|
{
|
|
"network.name": "Guest"
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
The `?.` operators works well for use in the `if` conditional
|
|
because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
|
|
returns null if the object is null and `==` is null safe (as well as many other
|
|
{painless}/painless-operators.html[painless operators]).
|
|
|
|
However, calling a method such as `.equalsIgnoreCase` is not null safe
|
|
and can result in a NullPointerException.
|
|
|
|
Some situations allow for the same functionality but done so in a null safe manner.
|
|
For example: `'Guest'.equalsIgnoreCase(ctx.network?.name)` is null safe because
|
|
`Guest` is always non null, but `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe
|
|
since `ctx.network?.name` can return null.
|
|
|
|
Some situations require an explicit null check. In the following example there
|
|
is not null safe alternative, so an explicit null check is needed.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[[ingest-conditional-complex]]
|
|
=== Complex Conditionals
|
|
The `if` condition can be more then a simple equality check.
|
|
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
|
|
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
|
|
|
|
IMPORTANT: The value of ctx is read-only in `if` conditions.
|
|
|
|
A more complex `if` condition that drops the document (i.e. not index it)
|
|
unless it has a multi-valued tag field with at least one value that contains the characters
|
|
`prod` (case insensitive).
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/not_prod_dropper
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
The conditional needs to be all on one line since JSON does not
|
|
support new line characters. However, Kibana's console supports
|
|
a triple quote syntax to help with writing and debugging
|
|
scripts like these.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/not_prod_dropper
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": """
|
|
Collection tags = ctx.tags;
|
|
if(tags != null){
|
|
for (String tag : tags) {
|
|
if (tag.toLowerCase().contains('prod')) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
"""
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
// TEST[continued]
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=not_prod_dropper
|
|
{
|
|
"tags": ["application:myapp", "env:Stage"]
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
The document is <<drop-processor,dropped>> since `prod` (case insensitive)
|
|
is not found in the tags.
|
|
|
|
The following document is indexed (i.e. not dropped) since
|
|
`prod` (case insensitive) is found in the tags.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/2?pipeline=not_prod_dropper
|
|
{
|
|
"tags": ["application:myapp", "env:Production"]
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,console]
|
|
--------------------------------------------------
|
|
GET test/_doc/2
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
[source,console-result]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "2",
|
|
"_version": 1,
|
|
"_seq_no": 34,
|
|
"_primary_term": 1,
|
|
"found": true,
|
|
"_source": {
|
|
"tags": [
|
|
"application:myapp",
|
|
"env:Production"
|
|
]
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
|
|
////
|
|
|
|
|
|
|
|
The <<simulate-pipeline-api>> with verbose can be used to help build out
|
|
complex conditionals. If the conditional evaluates to false it will be
|
|
omitted from the verbose results of the simulation since the document will not change.
|
|
|
|
Care should be taken to avoid overly complex or expensive conditional checks
|
|
since the condition needs to be checked for each and every document.
|
|
|
|
[[conditionals-with-multiple-pipelines]]
|
|
=== Conditionals with the Pipeline Processor
|
|
The combination of the `if` conditional and the <<pipeline-processor>> can result in a simple,
|
|
yet powerful means to process heterogeneous input. For example, you can define a single pipeline
|
|
that delegates to other pipelines based on some criteria.
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/logs_pipeline
|
|
{
|
|
"description": "A pipeline of pipelines for log files",
|
|
"version": 1,
|
|
"processors": [
|
|
{
|
|
"pipeline": {
|
|
"if": "ctx.service?.name == 'apache_httpd'",
|
|
"name": "httpd_pipeline"
|
|
}
|
|
},
|
|
{
|
|
"pipeline": {
|
|
"if": "ctx.service?.name == 'syslog'",
|
|
"name": "syslog_pipeline"
|
|
}
|
|
},
|
|
{
|
|
"fail": {
|
|
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
The above example allows consumers to point to a single pipeline for all log based index requests.
|
|
Based on the conditional, the correct pipeline will be called to process that type of data.
|
|
|
|
This pattern works well with a <<dynamic-index-settings, default pipeline>> defined in an index mapping
|
|
template for all indexes that hold data that needs pre-index processing.
|
|
|
|
[[conditionals-with-regex]]
|
|
=== Conditionals with the Regular Expressions
|
|
The `if` conditional is implemented as a Painless script, which requires
|
|
{painless}//painless-regexes.html[explicit support for regular expressions].
|
|
|
|
`script.painless.regex.enabled: true` must be set in `elasticsearch.yml` to use regular
|
|
expressions in the `if` condition.
|
|
|
|
If regular expressions are enabled, operators such as `=~` can be used against a `/pattern/` for conditions.
|
|
|
|
For example:
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/check_url
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"if": "ctx.href?.url =~ /^http[^s]/",
|
|
"field": "href.insecure",
|
|
"value": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=check_url
|
|
{
|
|
"href": {
|
|
"url": "http://www.elastic.co/"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
|
|
Results in:
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,console]
|
|
--------------------------------------------------
|
|
GET test/_doc/1
|
|
--------------------------------------------------
|
|
// TEST[continued]
|
|
////
|
|
|
|
[source,console-result]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "1",
|
|
"_version": 1,
|
|
"_seq_no": 60,
|
|
"_primary_term": 1,
|
|
"found": true,
|
|
"_source": {
|
|
"href": {
|
|
"insecure": true,
|
|
"url": "http://www.elastic.co/"
|
|
}
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
|
|
|
|
|
|
Regular expressions can be expensive and should be avoided if viable
|
|
alternatives exist.
|
|
|
|
For example in this case `startsWith` can be used to get the same result
|
|
without using a regular expression:
|
|
|
|
[source,console]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/check_url
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"if": "ctx.href?.url != null && ctx.href.url.startsWith('http://')",
|
|
"field": "href.insecure",
|
|
"value": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
|
|
[[handling-failure-in-pipelines]]
|
|
== Handling Failures in Pipelines
|
|
|
|
In its simplest use case, a pipeline defines a list of processors that
|
|
are executed sequentially, and processing halts at the first exception. This
|
|
behavior may not be desirable when failures are expected. For example, you may have logs
|
|
that don't match the specified grok expression. Instead of halting execution, you may
|
|
want to index such documents into a separate index.
|
|
|
|
To enable this behavior, you can use the `on_failure` parameter. The `on_failure` parameter
|
|
defines a list of processors to be executed immediately following the failed processor.
|
|
You can specify this parameter at the pipeline level, as well as at the processor
|
|
level. If a processor specifies an `on_failure` configuration, whether
|
|
it is empty or not, any exceptions that are thrown by the processor are caught, and the
|
|
pipeline continues executing the remaining processors. Because you can define further processors
|
|
within the scope of an `on_failure` statement, you can nest failure handling.
|
|
|
|
The following example defines a pipeline that renames the `foo` field in
|
|
the processed document to `bar`. If the document does not contain the `foo` field, the processor
|
|
attaches an error message to the document for later analysis within
|
|
Elasticsearch.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [
|
|
{
|
|
"rename" : {
|
|
"field" : "foo",
|
|
"target_field" : "bar",
|
|
"on_failure" : [
|
|
{
|
|
"set" : {
|
|
"field" : "error",
|
|
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The following example defines an `on_failure` block on a whole pipeline to change
|
|
the index to which failed documents get sent.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [ ... ],
|
|
"on_failure" : [
|
|
{
|
|
"set" : {
|
|
"field" : "_index",
|
|
"value" : "failed-{{ _index }}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Alternatively instead of defining behaviour in case of processor failure, it is also possible
|
|
to ignore a failure and continue with the next processor by specifying the `ignore_failure` setting.
|
|
|
|
In case in the example below the field `foo` doesn't exist the failure will be caught and the pipeline
|
|
continues to execute, which in this case means that the pipeline does nothing.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [
|
|
{
|
|
"rename" : {
|
|
"field" : "foo",
|
|
"target_field" : "bar",
|
|
"ignore_failure" : true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The `ignore_failure` can be set on any processor and defaults to `false`.
|
|
|
|
[float]
|
|
[[accessing-error-metadata]]
|
|
=== Accessing Error Metadata From Processors Handling Exceptions
|
|
|
|
You may want to retrieve the actual error message that was thrown
|
|
by a failed processor. To do so you can access metadata fields called
|
|
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
|
|
from within the context of an `on_failure` block.
|
|
|
|
Here is an updated version of the example that you
|
|
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
|
|
metadata field to provide the error message.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [
|
|
{
|
|
"rename" : {
|
|
"field" : "foo",
|
|
"to" : "bar",
|
|
"on_failure" : [
|
|
{
|
|
"set" : {
|
|
"field" : "error",
|
|
"value" : "{{ _ingest.on_failure_message }}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[role="xpack"]
|
|
[testenv="basic"]
|
|
[[ingest-enriching-data]]
|
|
== Enriching data with ingest node
|
|
|
|
The <<enrich-processor,enrich processor>> allows documents to be enriched with data from
|
|
an enrich index that is managed by an enrich policy prior to indexing.
|
|
|
|
The data that is used by the enrich index is managed by the user in regular indices.
|
|
An enrich policy is configuration that indicates how an enrich index is created from
|
|
the data in the user's maintained indices. When an enrich policy is executed
|
|
a new enrich index is created for that policy, which the enrich process can then use.
|
|
|
|
An enrich policy also controls what kind of enrichment the `enrich` processor is able to do.
|
|
|
|
[[enrich-policy-definition]]
|
|
=== Enrich Policy Definition
|
|
|
|
The <<enrich-processor,enrich processor>> requires more than just the configuration in a pipeline.
|
|
The main piece to configure is the enrich policy:
|
|
|
|
[[enrich-policy-options]]
|
|
.Enrich policy options
|
|
[options="header"]
|
|
|======
|
|
| Name | Required | Default | Description
|
|
| `type` | yes | - | The policy type.
|
|
| `indices` | yes | - | The indices to fetch the data from.
|
|
| `query` | no | `match_all` query | The query to be used to select which documents are included.
|
|
| `match_field` | yes | - | The field that will be used to match against an input document.
|
|
| `enrich_fields` | yes | - | The fields that will be available to enrich the input document.
|
|
|======
|
|
|
|
[[enrich-policy-types]]
|
|
==== Policy types
|
|
|
|
An enrich processor is associated with a policy via the `policy_name` option.
|
|
The policy type of the policy determines what kind of enrichment an `enrich` processor is able to do.
|
|
|
|
The following policy types are currently supported:
|
|
|
|
* `match` - Can lookup documents by running a term query and use the retrieved content to enrich the document being ingested.
|
|
|
|
[[enrich-processor-getting-started]]
|
|
=== Getting started
|
|
|
|
Create a regular index that contains data you like to enrich your incoming documents with:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT /users/_doc/1?refresh
|
|
{
|
|
"email": "mardy.brown@email.me",
|
|
"first_name": "Mardy",
|
|
"last_name": "Brown",
|
|
"address": "6649 N Blue Gum St",
|
|
"city": "New Orleans",
|
|
"county": "Orleans",
|
|
"state": "LA",
|
|
"zip": 70116,
|
|
"phone1":"504-621-8927",
|
|
"phone2": "504-845-1427",
|
|
"web": "mardy-brown.me"
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Create an enrich policy:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT /_enrich/policy/users-policy
|
|
{
|
|
"match": {
|
|
"indices": "users",
|
|
"match_field": "email",
|
|
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Which returns:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"acknowledged": true
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
Execute that enrich policy:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST /_enrich/policy/users-policy/_execute
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Which returns:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"acknowledged": true
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
Create the pipeline and enrich a document:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/user_lookup
|
|
{
|
|
"description" : "Enriching user details to messages",
|
|
"processors" : [
|
|
{
|
|
"enrich" : {
|
|
"policy_name": "users-policy",
|
|
"field" : "email",
|
|
"target_field": "user"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
PUT my_index/_doc/my_id?pipeline=user_lookup
|
|
{
|
|
"email": "mardy.brown@email.me"
|
|
}
|
|
|
|
GET my_index/_doc/my_id
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Which returns:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"found": true,
|
|
"_index": "my_index",
|
|
"_type": "_doc",
|
|
"_id": "my_id",
|
|
"_version": 1,
|
|
"_seq_no": 55,
|
|
"_primary_term": 1,
|
|
"_source": {
|
|
"user": [
|
|
{
|
|
"email": "mardy.brown@email.me",
|
|
"first_name": "Mardy",
|
|
"last_name": "Brown",
|
|
"zip": 70116,
|
|
"address": "6649 N Blue Gum St",
|
|
"city": "New Orleans",
|
|
"state": "LA"
|
|
}
|
|
],
|
|
"email": "mardy.brown@email.me"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term":1/"_primary_term" : $body._primary_term/]
|
|
|
|
//////////////////////////
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
DELETE /_ingest/pipeline/user_lookup
|
|
DELETE /_enrich/policy/users-policy
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
//////////////////////////
|
|
|
|
[[enrich-policy-apis]]
|
|
=== Enrich Policy APIs
|
|
|
|
Also there are several APIs in order to manage and execute enrich policies:
|
|
|
|
* <<put-policy-api,Put policy api>>.
|
|
* <<get-policy-api,Get policy api>>.
|
|
* <<delete-policy-api,Delete policy api>>.
|
|
* <<execute-policy-api,Execute policy api>>.
|
|
|
|
If security is enabled then the user managing enrich policies will need to have
|
|
the `enrich_user` builtin role. Also the user will need to have read privileges
|
|
for the indices the enrich policy is referring to.
|
|
|
|
[[put-policy-api]]
|
|
==== Put Policy API
|
|
|
|
The put policy api allows a policy to be stored by an user specified id in the url and
|
|
the enrich policy definition as body.
|
|
|
|
Request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT /_enrich/policy/my-policy
|
|
{
|
|
"match": {
|
|
"indices": "users",
|
|
"match_field": "email",
|
|
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"acknowledged": true
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
[[get-policy-api]]
|
|
==== Get Policy API
|
|
|
|
The get policy api allows a policy to be retrieved by id.
|
|
|
|
Request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET /_enrich/policy/my-policy
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"policies": [
|
|
{
|
|
"match": {
|
|
"name" : "my-policy",
|
|
"indices" : ["users"],
|
|
"match_field" : "email",
|
|
"enrich_fields" : [
|
|
"first_name",
|
|
"last_name",
|
|
"address",
|
|
"city",
|
|
"zip",
|
|
"state"
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
The get policy api allows all policies to be returned.
|
|
|
|
Request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET /_enrich/policy
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"policies": [
|
|
{
|
|
"match": {
|
|
"name" : "my-policy",
|
|
"indices" : ["users"],
|
|
"match_field" : "email",
|
|
"enrich_fields" : [
|
|
"first_name",
|
|
"last_name",
|
|
"address",
|
|
"city",
|
|
"zip",
|
|
"state"
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
[[execute-policy-api]]
|
|
==== Execute Policy API
|
|
|
|
The execute policy api executes a policy based on the provided id.
|
|
It may take some time before this API returns a response.
|
|
Executing a policy involves creating a new enrich index, indexing the documents from
|
|
the indices specified in policy into the enrich index and some other operations.
|
|
|
|
Note that this api needs to be re-executed in order to incorporate new changes
|
|
in the index the policy is pointing to after the policy has been executed.
|
|
|
|
This API creates an index with the `.enrich-*` prefix in the name. This index purpose
|
|
is the be used by the enrich processor only and should not be used by anything else.
|
|
Internally old `.enrich-*` are removed by an internal cleanup mechanism.
|
|
|
|
//////////////////////////
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT /users/_doc/1?refresh
|
|
{
|
|
"email": "mardy.brown@email.me",
|
|
"first_name": "Mardy",
|
|
"last_name": "Brown",
|
|
"address": "6649 N Blue Gum St",
|
|
"city": "New Orleans",
|
|
"county": "Orleans",
|
|
"state": "LA",
|
|
"zip": 70116,
|
|
"phone1":"504-621-8927",
|
|
"phone2": "504-845-1427",
|
|
"web": "mardy-brown.me"
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
//////////////////////////
|
|
|
|
Request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST /_enrich/policy/my-policy/_execute
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"acknowledged": true
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
[[delete-policy-api]]
|
|
===== Delete Policy API
|
|
|
|
The delete policy api allows a policy to be removed by id.
|
|
|
|
Request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
DELETE /_enrich/policy/my-policy
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"acknowledged": true
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
[[ingest-processors]]
|
|
== Processors
|
|
|
|
All processors are defined in the following way within a pipeline definition:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"PROCESSOR_NAME" : {
|
|
... processor configuration options ...
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Each processor defines its own configuration parameters, but all processors have
|
|
the ability to declare `tag`, `on_failure` and `if` fields. These fields are optional.
|
|
|
|
A `tag` is simply a string identifier of the specific instantiation of a certain
|
|
processor in a pipeline. The `tag` field does not affect the processor's behavior,
|
|
but is very useful for bookkeeping and tracing errors to specific processors.
|
|
|
|
The `if` field must contain a script that returns a boolean value. If the script evaluates to `true`
|
|
then the processor will be executed for the given document otherwise it will be skipped.
|
|
The `if` field takes an object with the script fields defined in <<script-processor, script-options>>
|
|
and accesses a read only version of the document via the same `ctx` variable used by scripts in the
|
|
<<script-processor>>.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"if": "ctx.foo == 'someValue'",
|
|
"field": "found",
|
|
"value": true
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
See <<ingest-conditionals>> to learn more about the `if` field and conditional execution.
|
|
|
|
See <<handling-failure-in-pipelines>> to learn more about the `on_failure` field and error handling in pipelines.
|
|
|
|
The <<cluster-nodes-info,node info API>> can be used to figure out what processors are available in a cluster.
|
|
The <<cluster-nodes-info,node info API>> will provide a per node list of what processors are available.
|
|
|
|
Custom processors must be installed on all nodes. The put pipeline API will fail if a processor specified in a pipeline
|
|
doesn't exist on all nodes. If you rely on custom processor plugins make sure to mark these plugins as mandatory by adding
|
|
`plugin.mandatory` setting to the `config/elasticsearch.yml` file, for example:
|
|
|
|
[source,yaml]
|
|
--------------------------------------------------
|
|
plugin.mandatory: ingest-attachment
|
|
--------------------------------------------------
|
|
|
|
A node will not start if this plugin is not available.
|
|
|
|
The <<cluster-nodes-stats,node stats API>> can be used to fetch ingest usage statistics, globally and on a per
|
|
pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.
|
|
|
|
[float]
|
|
=== Ingest Processor Plugins
|
|
|
|
Additional ingest processors can be implemented and installed as Elasticsearch {plugins}/intro.html[plugins].
|
|
See {plugins}/ingest.html[Ingest plugins] for information about the available ingest plugins.
|
|
|
|
include::processors/append.asciidoc[]
|
|
include::processors/bytes.asciidoc[]
|
|
include::processors/circle.asciidoc[]
|
|
include::processors/convert.asciidoc[]
|
|
include::processors/date.asciidoc[]
|
|
include::processors/date-index-name.asciidoc[]
|
|
include::processors/dissect.asciidoc[]
|
|
include::processors/dot-expand.asciidoc[]
|
|
include::processors/drop.asciidoc[]
|
|
include::processors/enrich.asciidoc[]
|
|
include::processors/fail.asciidoc[]
|
|
include::processors/foreach.asciidoc[]
|
|
include::processors/geoip.asciidoc[]
|
|
include::processors/grok.asciidoc[]
|
|
include::processors/gsub.asciidoc[]
|
|
include::processors/html_strip.asciidoc[]
|
|
include::processors/join.asciidoc[]
|
|
include::processors/json.asciidoc[]
|
|
include::processors/kv.asciidoc[]
|
|
include::processors/lowercase.asciidoc[]
|
|
include::processors/pipeline.asciidoc[]
|
|
include::processors/remove.asciidoc[]
|
|
include::processors/rename.asciidoc[]
|
|
include::processors/script.asciidoc[]
|
|
include::processors/set.asciidoc[]
|
|
include::processors/set-security-user.asciidoc[]
|
|
include::processors/split.asciidoc[]
|
|
include::processors/sort.asciidoc[]
|
|
include::processors/trim.asciidoc[]
|
|
include::processors/uppercase.asciidoc[]
|
|
include::processors/url-decode.asciidoc[]
|
|
include::processors/user-agent.asciidoc[]
|