1353 lines
37 KiB
Plaintext
1353 lines
37 KiB
Plaintext
[[pipeline]]
|
|
== Pipeline Definition
|
|
|
|
A pipeline is a definition of a series of <<ingest-processors, processors>> that are to be executed
|
|
in the same order as they are declared. A pipeline consists of two main fields: a `description`
|
|
and a list of `processors`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "...",
|
|
"processors" : [ ... ]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The `description` is a special field to store a helpful description of
|
|
what the pipeline does.
|
|
|
|
The `processors` parameter defines a list of processors to be executed in
|
|
order.
|
|
|
|
[[ingest-apis]]
|
|
== Ingest APIs
|
|
|
|
The following ingest APIs are available for managing pipelines:
|
|
|
|
* <<put-pipeline-api>> to add or update a pipeline
|
|
* <<get-pipeline-api>> to return a specific pipeline
|
|
* <<delete-pipeline-api>> to delete a pipeline
|
|
* <<simulate-pipeline-api>> to simulate a call to a pipeline
|
|
|
|
[[put-pipeline-api]]
|
|
=== Put Pipeline API
|
|
|
|
The put pipeline API adds pipelines and updates existing pipelines in the cluster.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/my-pipeline-id
|
|
{
|
|
"description" : "describe pipeline",
|
|
"processors" : [
|
|
{
|
|
"set" : {
|
|
"field": "foo",
|
|
"value": "bar"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
NOTE: The put pipeline API also instructs all ingest nodes to reload their in-memory representation of pipelines, so that
|
|
pipeline changes take effect immediately.
|
|
|
|
[[get-pipeline-api]]
|
|
=== Get Pipeline API
|
|
|
|
The get pipeline API returns pipelines based on ID. This API always returns a local reference of the pipeline.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET _ingest/pipeline/my-pipeline-id
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Example response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"my-pipeline-id" : {
|
|
"description" : "describe pipeline",
|
|
"processors" : [
|
|
{
|
|
"set" : {
|
|
"field" : "foo",
|
|
"value" : "bar"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
For each returned pipeline, the source and the version are returned.
|
|
The version is useful for knowing which version of the pipeline the node has.
|
|
You can specify multiple IDs to return more than one pipeline. Wildcards are also supported.
|
|
|
|
[float]
|
|
[[versioning-pipelines]]
|
|
==== Pipeline Versioning
|
|
|
|
Pipelines can optionally add a `version` number, which can be any integer value,
|
|
in order to simplify pipeline management by external systems. The `version`
|
|
field is completely optional and it is meant solely for external management of
|
|
pipelines. To unset a `version`, simply replace the pipeline without specifying
|
|
one.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/my-pipeline-id
|
|
{
|
|
"description" : "describe pipeline",
|
|
"version" : 123,
|
|
"processors" : [
|
|
{
|
|
"set" : {
|
|
"field": "foo",
|
|
"value": "bar"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
To check for the `version`, you can
|
|
<<common-options-response-filtering, filter responses>>
|
|
using `filter_path` to limit the response to just the `version`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET /_ingest/pipeline/my-pipeline-id?filter_path=*.version
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
This should give a small response that makes it both easy and inexpensive to parse:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"my-pipeline-id" : {
|
|
"version" : 123
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
[[delete-pipeline-api]]
|
|
=== Delete Pipeline API
|
|
|
|
The delete pipeline API deletes pipelines by ID or wildcard match (`my-*`, `*`).
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
DELETE _ingest/pipeline/my-pipeline-id
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden setup for wildcard test:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/wild-one
|
|
{
|
|
"description" : "first pipeline to be wildcard deleted",
|
|
"processors" : [ ]
|
|
}
|
|
|
|
PUT _ingest/pipeline/wild-two
|
|
{
|
|
"description" : "second pipeline to be wildcard deleted",
|
|
"processors" : [ ]
|
|
}
|
|
|
|
DELETE _ingest/pipeline/*
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Hidden expected response:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"acknowledged": true
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
////
|
|
|
|
[[simulate-pipeline-api]]
|
|
=== Simulate Pipeline API
|
|
|
|
The simulate pipeline API executes a specific pipeline against
|
|
the set of documents provided in the body of the request.
|
|
|
|
You can either specify an existing pipeline to execute
|
|
against the provided documents, or supply a pipeline definition in
|
|
the body of the request.
|
|
|
|
Here is the structure of a simulate request with a pipeline definition provided
|
|
in the body of the request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST _ingest/pipeline/_simulate
|
|
{
|
|
"pipeline" : {
|
|
// pipeline definition here
|
|
},
|
|
"docs" : [
|
|
{ "_source": {/** first document **/} },
|
|
{ "_source": {/** second document **/} },
|
|
// ...
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Here is the structure of a simulate request against an existing pipeline:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST _ingest/pipeline/my-pipeline-id/_simulate
|
|
{
|
|
"docs" : [
|
|
{ "_source": {/** first document **/} },
|
|
{ "_source": {/** second document **/} },
|
|
// ...
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Here is an example of a simulate request with a pipeline defined in the request
|
|
and its response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST _ingest/pipeline/_simulate
|
|
{
|
|
"pipeline" :
|
|
{
|
|
"description": "_description",
|
|
"processors": [
|
|
{
|
|
"set" : {
|
|
"field" : "field2",
|
|
"value" : "_value"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"docs": [
|
|
{
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_id": "id",
|
|
"_source": {
|
|
"foo": "bar"
|
|
}
|
|
},
|
|
{
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_id": "id",
|
|
"_source": {
|
|
"foo": "rab"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"docs": [
|
|
{
|
|
"doc": {
|
|
"_id": "id",
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_source": {
|
|
"field2": "_value",
|
|
"foo": "bar"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2017-05-04T22:30:03.187Z"
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"doc": {
|
|
"_id": "id",
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_source": {
|
|
"field2": "_value",
|
|
"foo": "rab"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2017-05-04T22:30:03.188Z"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"2017-05-04T22:30:03.187Z"/$body.docs.0.doc._ingest.timestamp/]
|
|
// TESTRESPONSE[s/"2017-05-04T22:30:03.188Z"/$body.docs.1.doc._ingest.timestamp/]
|
|
|
|
[[ingest-verbose-param]]
|
|
==== Viewing Verbose Results
|
|
You can use the simulate pipeline API to see how each processor affects the ingest document
|
|
as it passes through the pipeline. To see the intermediate results of
|
|
each processor in the simulate request, you can add the `verbose` parameter
|
|
to the request.
|
|
|
|
Here is an example of a verbose request and its response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST _ingest/pipeline/_simulate?verbose
|
|
{
|
|
"pipeline" :
|
|
{
|
|
"description": "_description",
|
|
"processors": [
|
|
{
|
|
"set" : {
|
|
"field" : "field2",
|
|
"value" : "_value2"
|
|
}
|
|
},
|
|
{
|
|
"set" : {
|
|
"field" : "field3",
|
|
"value" : "_value3"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"docs": [
|
|
{
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_id": "id",
|
|
"_source": {
|
|
"foo": "bar"
|
|
}
|
|
},
|
|
{
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_id": "id",
|
|
"_source": {
|
|
"foo": "rab"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Response:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"docs": [
|
|
{
|
|
"processor_results": [
|
|
{
|
|
"doc": {
|
|
"_id": "id",
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_source": {
|
|
"field2": "_value2",
|
|
"foo": "bar"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2017-05-04T22:46:09.674Z"
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"doc": {
|
|
"_id": "id",
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_source": {
|
|
"field3": "_value3",
|
|
"field2": "_value2",
|
|
"foo": "bar"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2017-05-04T22:46:09.675Z"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"processor_results": [
|
|
{
|
|
"doc": {
|
|
"_id": "id",
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_source": {
|
|
"field2": "_value2",
|
|
"foo": "rab"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2017-05-04T22:46:09.676Z"
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"doc": {
|
|
"_id": "id",
|
|
"_index": "index",
|
|
"_type": "_doc",
|
|
"_source": {
|
|
"field3": "_value3",
|
|
"field2": "_value2",
|
|
"foo": "rab"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2017-05-04T22:46:09.677Z"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"2017-05-04T22:46:09.674Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/]
|
|
// TESTRESPONSE[s/"2017-05-04T22:46:09.675Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/]
|
|
// TESTRESPONSE[s/"2017-05-04T22:46:09.676Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/]
|
|
// TESTRESPONSE[s/"2017-05-04T22:46:09.677Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/]
|
|
|
|
[[accessing-data-in-pipelines]]
|
|
== Accessing Data in Pipelines
|
|
|
|
The processors in a pipeline have read and write access to documents that pass through the pipeline.
|
|
The processors can access fields in the source of a document and the document's metadata fields.
|
|
|
|
[float]
|
|
[[accessing-source-fields]]
|
|
=== Accessing Fields in the Source
|
|
Accessing a field in the source is straightforward. You simply refer to fields by
|
|
their name. For example:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "my_field",
|
|
"value": 582.1
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
On top of this, fields from the source are always accessible via the `_source` prefix:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "_source.my_field",
|
|
"value": 582.1
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[float]
|
|
[[accessing-metadata-fields]]
|
|
=== Accessing Metadata Fields
|
|
You can access metadata fields in the same way that you access fields in the source. This
|
|
is possible because Elasticsearch doesn't allow fields in the source that have the
|
|
same name as metadata fields.
|
|
|
|
The following example sets the `_id` metadata field of a document to `1`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "_id",
|
|
"value": "1"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`.
|
|
|
|
[float]
|
|
[[accessing-ingest-metadata]]
|
|
=== Accessing Ingest Metadata Fields
|
|
Beyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes.
|
|
These metadata properties are accessible under the `_ingest` key. Currently ingest adds the ingest timestamp
|
|
under the `_ingest.timestamp` key of the ingest metadata. The ingest timestamp is the time when Elasticsearch
|
|
received the index or bulk request to pre-process the document.
|
|
|
|
Any processor can add ingest-related metadata during document processing. Ingest metadata is transient
|
|
and is lost after a document has been processed by the pipeline. Therefore, ingest metadata won't be indexed.
|
|
|
|
The following example adds a field with the name `received`. The value is the ingest timestamp:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "received",
|
|
"value": "{{_ingest.timestamp}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Unlike Elasticsearch metadata fields, the ingest metadata field name `_ingest` can be used as a valid field name
|
|
in the source of a document. Use `_source._ingest` to refer to the field in the source document. Otherwise, `_ingest`
|
|
will be interpreted as an ingest metadata field.
|
|
|
|
[float]
|
|
[[accessing-template-fields]]
|
|
=== Accessing Fields and Metafields in Templates
|
|
A number of processor settings also support templating. Settings that support templating can have zero or more
|
|
template snippets. A template snippet begins with `{{` and ends with `}}`.
|
|
Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
|
|
|
|
The following example adds a field named `field_c`. Its value is a concatenation of
|
|
the values of `field_a` and `field_b`.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "field_c",
|
|
"value": "{{field_a}} {{field_b}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The following example uses the value of the `geoip.country_iso_code` field in the source
|
|
to set the index that the document will be indexed into:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "_index",
|
|
"value": "{{geoip.country_iso_code}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Dynamic field names are also supported. This example sets the field named after the
|
|
value of `service` to the value of the field `code`:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"field": "{{service}}",
|
|
"value": "{{code}}"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[[ingest-conditionals]]
|
|
== Conditional Execution in Pipelines
|
|
|
|
Each processor allows for an optional `if` condition to determine if that
|
|
processor should be executed or skipped. The value of the `if` is a
|
|
<<modules-scripting-painless, Painless>> script that needs to evaluate
|
|
to `true` or `false`.
|
|
|
|
For example the following processor will <<drop-processor,drop>> the document
|
|
(i.e. not index it) if the input document has a field named `network_name`
|
|
and it is equal to `Guest`.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/drop_guests_network
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network_name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Using that pipeline for an index request:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=drop_guests_network
|
|
{
|
|
"network_name" : "Guest"
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Results in nothing indexed since the conditional evaluated to `true`.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "1",
|
|
"_version": -3,
|
|
"result": "noop",
|
|
"_shards": {
|
|
"total": 0,
|
|
"successful": 0,
|
|
"failed": 0
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
|
|
|
|
[[ingest-conditional-nullcheck]]
|
|
=== Handling Nested Fields in Conditionals
|
|
|
|
Source documents often contain nested fields. Care should be taken
|
|
to avoid NullPointerExceptions if the parent object does not exist
|
|
in the document. For example `ctx.a.b.c` can throw an NullPointerExceptions
|
|
if the source document does not have top level `a` object, or a second
|
|
level `b` object.
|
|
|
|
To help protect against NullPointerExceptions, null safe operations should be used.
|
|
Fortunately, Painless makes {painless}/painless-operators-reference.html#null-safe-operator[null safe]
|
|
operations easy with the `?.` operator.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/drop_guests_network
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network?.name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
The following document will get <<drop-processor,dropped>> correctly:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=drop_guests_network
|
|
{
|
|
"network": {
|
|
"name": "Guest"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET test/_doc/1
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
// TEST[catch:missing]
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "1",
|
|
"found": false
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
////
|
|
|
|
Thanks to the `?.` operator the following document will not throw an error.
|
|
If the pipeline used a `.` the following document would throw a NullPointerException
|
|
since the `network` object is not part of the source document.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/2?pipeline=drop_guests_network
|
|
{
|
|
"foo" : "bar"
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET test/_doc/2
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "2",
|
|
"_version": 1,
|
|
"_seq_no": 22,
|
|
"_primary_term": 1,
|
|
"found": true,
|
|
"_source": {
|
|
"foo": "bar"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term": 1/"_primary_term" : $body._primary_term/]
|
|
////
|
|
|
|
The source document can also use dot delimited fields to represent nested fields.
|
|
|
|
For example instead the source document defining the fields nested:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"network": {
|
|
"name": "Guest"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The source document may have the nested fields flattened as such:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"network.name": "Guest"
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
If this is the case, use the <<dot-expand-processor, Dot Expand Processor>>
|
|
so that the nested fields may be used in a conditional.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/drop_guests_network
|
|
{
|
|
"processors": [
|
|
{
|
|
"dot_expander": {
|
|
"field": "network.name"
|
|
}
|
|
},
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network?.name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
Now the following input document can be used with a conditional in the pipeline.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/3?pipeline=drop_guests_network
|
|
{
|
|
"network.name": "Guest"
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET test/_doc/3
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
// TEST[catch:missing]
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "3",
|
|
"found": false
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
////
|
|
|
|
The `?.` operators works well for use in the `if` conditional
|
|
because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
|
|
returns null if the object is null and `==` is null safe (as well as many other
|
|
{painless}/painless-operators.html[painless operators]).
|
|
|
|
However, calling a method such as `.equalsIgnoreCase` is not null safe
|
|
and can result in a NullPointerException.
|
|
|
|
Some situations allow for the same functionality but done so in a null safe manner.
|
|
For example: `'Guest'.equalsIgnoreCase(ctx.network?.name)` is null safe because
|
|
`Guest` is always non null, but `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe
|
|
since `ctx.network?.name` can return null.
|
|
|
|
Some situations require an explicit null check. In the following example there
|
|
is not null safe alternative, so an explict null check is needed.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"drop": {
|
|
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[[ingest-conditional-complex]]
|
|
=== Complex Conditionals
|
|
The `if` condition can be more then a simple equality check.
|
|
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
|
|
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
|
|
|
|
IMPORTANT: The value of ctx is read-only in `if` conditions.
|
|
|
|
A more complex `if` condition that drops the document (i.e. not index it)
|
|
unless it has a multi-valued tag field with at least one value that contains the characters
|
|
`prod` (case insensitive).
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/not_prod_dropper
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
The conditional needs to be all on one line since JSON does not
|
|
support new line characters. However, Kibana's console supports
|
|
a triple quote syntax to help with writing and debugging
|
|
scripts like these.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/not_prod_dropper
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"if": """
|
|
Collection tags = ctx.tags;
|
|
if(tags != null){
|
|
for (String tag : tags) {
|
|
if (tag.toLowerCase().contains('prod')) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
"""
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
// TEST[continued]
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=not_prod_dropper
|
|
{
|
|
"tags": ["application:myapp", "env:Stage"]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
The document is <<drop-processor,dropped>> since `prod` (case insensitive)
|
|
is not found in the tags.
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET test/_doc/1
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
// TEST[catch:missing]
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "1",
|
|
"found": false
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE
|
|
////
|
|
|
|
The following document is indexed (i.e. not dropped) since
|
|
`prod` (case insensitive) is found in the tags.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/2?pipeline=not_prod_dropper
|
|
{
|
|
"tags": ["application:myapp", "env:Production"]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET test/_doc/2
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "2",
|
|
"_version": 1,
|
|
"_seq_no": 34,
|
|
"_primary_term": 1,
|
|
"found": true,
|
|
"_source": {
|
|
"tags": [
|
|
"application:myapp",
|
|
"env:Production"
|
|
]
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
|
|
////
|
|
|
|
|
|
|
|
The <<simulate-pipeline-api>> with verbose can be used to help build out
|
|
complex conditionals. If the conditional evaluates to false it will be
|
|
omitted from the verbose results of the simulation since the document will not change.
|
|
|
|
Care should be taken to avoid overly complex or expensive conditional checks
|
|
since the condition needs to be checked for each and every document.
|
|
|
|
[[conditionals-with-multiple-pipelines]]
|
|
=== Conditionals with the Pipeline Processor
|
|
The combination of the `if` conditional and the <<pipeline-processor>> can result in a simple,
|
|
yet powerful means to process heterogeneous input. For example, you can define a single pipeline
|
|
that delegates to other pipelines based on some criteria.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/logs_pipeline
|
|
{
|
|
"description": "A pipeline of pipelines for log files",
|
|
"version": 1,
|
|
"processors": [
|
|
{
|
|
"pipeline": {
|
|
"if": "ctx.service?.name == 'apache_httpd'",
|
|
"name": "httpd_pipeline"
|
|
}
|
|
},
|
|
{
|
|
"pipeline": {
|
|
"if": "ctx.service?.name == 'syslog'",
|
|
"name": "syslog_pipeline"
|
|
}
|
|
},
|
|
{
|
|
"fail": {
|
|
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
The above example allows consumers to point to a single pipeline for all log based index requests.
|
|
Based on the conditional, the correct pipeline will be called to process that type of data.
|
|
|
|
This pattern works well with a <<dynamic-index-settings, default pipeline>> defined in an index mapping
|
|
template for all indexes that hold data that needs pre-index processing.
|
|
|
|
[[conditionals-with-regex]]
|
|
=== Conditionals with the Regular Expressions
|
|
The `if` conditional is implemented as a Painless script, which requires
|
|
{painless}//painless-examples.html#modules-scripting-painless-regex[explicit support for regular expressions].
|
|
|
|
`script.painless.regex.enabled: true` must be set in `elasticsearch.yml` to use regular
|
|
expressions in the `if` condition.
|
|
|
|
If regular expressions are enabled, operators such as `=~` can be used against a `/pattern/` for conditions.
|
|
|
|
For example:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/check_url
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"if": "ctx.href?.url =~ /^http[^s]/",
|
|
"field": "href.insecure",
|
|
"value": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
POST test/_doc/1?pipeline=check_url
|
|
{
|
|
"href": {
|
|
"url": "http://www.elastic.co/"
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
|
|
Results in:
|
|
|
|
////
|
|
Hidden example assertion:
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET test/_doc/1
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
// TEST[continued]
|
|
////
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"_index": "test",
|
|
"_type": "_doc",
|
|
"_id": "1",
|
|
"_version": 1,
|
|
"_seq_no": 60,
|
|
"_primary_term": 1,
|
|
"found": true,
|
|
"_source": {
|
|
"href": {
|
|
"insecure": true,
|
|
"url": "http://www.elastic.co/"
|
|
}
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
|
|
|
|
|
|
Regular expressions can be expensive and should be avoided if viable
|
|
alternatives exist.
|
|
|
|
For example in this case `startsWith` can be used to get the same result
|
|
without using a regular expression:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
PUT _ingest/pipeline/check_url
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"if": "ctx.href?.url != null && ctx.href.url.startsWith('http://')",
|
|
"field": "href.insecure",
|
|
"value": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// CONSOLE
|
|
|
|
[[handling-failure-in-pipelines]]
|
|
== Handling Failures in Pipelines
|
|
|
|
In its simplest use case, a pipeline defines a list of processors that
|
|
are executed sequentially, and processing halts at the first exception. This
|
|
behavior may not be desirable when failures are expected. For example, you may have logs
|
|
that don't match the specified grok expression. Instead of halting execution, you may
|
|
want to index such documents into a separate index.
|
|
|
|
To enable this behavior, you can use the `on_failure` parameter. The `on_failure` parameter
|
|
defines a list of processors to be executed immediately following the failed processor.
|
|
You can specify this parameter at the pipeline level, as well as at the processor
|
|
level. If a processor specifies an `on_failure` configuration, whether
|
|
it is empty or not, any exceptions that are thrown by the processor are caught, and the
|
|
pipeline continues executing the remaining processors. Because you can define further processors
|
|
within the scope of an `on_failure` statement, you can nest failure handling.
|
|
|
|
The following example defines a pipeline that renames the `foo` field in
|
|
the processed document to `bar`. If the document does not contain the `foo` field, the processor
|
|
attaches an error message to the document for later analysis within
|
|
Elasticsearch.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [
|
|
{
|
|
"rename" : {
|
|
"field" : "foo",
|
|
"target_field" : "bar",
|
|
"on_failure" : [
|
|
{
|
|
"set" : {
|
|
"field" : "error",
|
|
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The following example defines an `on_failure` block on a whole pipeline to change
|
|
the index to which failed documents get sent.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [ ... ],
|
|
"on_failure" : [
|
|
{
|
|
"set" : {
|
|
"field" : "_index",
|
|
"value" : "failed-{{ _index }}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Alternatively instead of defining behaviour in case of processor failure, it is also possible
|
|
to ignore a failure and continue with the next processor by specifying the `ignore_failure` setting.
|
|
|
|
In case in the example below the field `foo` doesn't exist the failure will be caught and the pipeline
|
|
continues to execute, which in this case means that the pipeline does nothing.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [
|
|
{
|
|
"rename" : {
|
|
"field" : "foo",
|
|
"target_field" : "bar",
|
|
"ignore_failure" : true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The `ignore_failure` can be set on any processor and defaults to `false`.
|
|
|
|
[float]
|
|
[[accessing-error-metadata]]
|
|
=== Accessing Error Metadata From Processors Handling Exceptions
|
|
|
|
You may want to retrieve the actual error message that was thrown
|
|
by a failed processor. To do so you can access metadata fields called
|
|
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
|
|
from within the context of an `on_failure` block.
|
|
|
|
Here is an updated version of the example that you
|
|
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
|
|
metadata field to provide the error message.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"description" : "my first pipeline with handled exceptions",
|
|
"processors" : [
|
|
{
|
|
"rename" : {
|
|
"field" : "foo",
|
|
"to" : "bar",
|
|
"on_failure" : [
|
|
{
|
|
"set" : {
|
|
"field" : "error",
|
|
"value" : "{{ _ingest.on_failure_message }}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
[[ingest-processors]]
|
|
== Processors
|
|
|
|
All processors are defined in the following way within a pipeline definition:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"PROCESSOR_NAME" : {
|
|
... processor configuration options ...
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
Each processor defines its own configuration parameters, but all processors have
|
|
the ability to declare `tag`, `on_failure` and `if` fields. These fields are optional.
|
|
|
|
A `tag` is simply a string identifier of the specific instantiation of a certain
|
|
processor in a pipeline. The `tag` field does not affect the processor's behavior,
|
|
but is very useful for bookkeeping and tracing errors to specific processors.
|
|
|
|
The `if` field must contain a script that returns a boolean value. If the script evaluates to `true`
|
|
then the processor will be executed for the given document otherwise it will be skipped.
|
|
The `if` field takes an object with the script fields defined in <<script-processor, script-options>>
|
|
and accesses a read only version of the document via the same `ctx` variable used by scripts in the
|
|
<<script-processor>>.
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
{
|
|
"set": {
|
|
"if": "ctx.foo == 'someValue'",
|
|
"field": "found",
|
|
"value": true
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
See <<ingest-conditionals>> to learn more about the `if` field and conditional execution.
|
|
|
|
See <<handling-failure-in-pipelines>> to learn more about the `on_failure` field and error handling in pipelines.
|
|
|
|
The <<ingest-info,node info API>> can be used to figure out what processors are available in a cluster.
|
|
The <<ingest-info,node info API>> will provide a per node list of what processors are available.
|
|
|
|
Custom processors must be installed on all nodes. The put pipeline API will fail if a processor specified in a pipeline
|
|
doesn't exist on all nodes. If you rely on custom processor plugins make sure to mark these plugins as mandatory by adding
|
|
`plugin.mandatory` setting to the `config/elasticsearch.yml` file, for example:
|
|
|
|
[source,yaml]
|
|
--------------------------------------------------
|
|
plugin.mandatory: ingest-attachment,ingest-geoip
|
|
--------------------------------------------------
|
|
|
|
A node will not start if either of these plugins are not available.
|
|
|
|
The <<ingest-stats,node stats API>> can be used to fetch ingest usage statistics, globally and on a per
|
|
pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.
|
|
|
|
[float]
|
|
=== Ingest Processor Plugins
|
|
|
|
Additional ingest processors can be implemented and installed as Elasticsearch {plugins}/intro.html[plugins].
|
|
See {plugins}/ingest.html[Ingest plugins] for information about the available ingest plugins.
|
|
|
|
include::processors/append.asciidoc[]
|
|
include::processors/bytes.asciidoc[]
|
|
include::processors/convert.asciidoc[]
|
|
include::processors/date.asciidoc[]
|
|
include::processors/date-index-name.asciidoc[]
|
|
include::processors/dissect.asciidoc[]
|
|
include::processors/dot-expand.asciidoc[]
|
|
include::processors/drop.asciidoc[]
|
|
include::processors/fail.asciidoc[]
|
|
include::processors/foreach.asciidoc[]
|
|
include::processors/grok.asciidoc[]
|
|
include::processors/gsub.asciidoc[]
|
|
include::processors/join.asciidoc[]
|
|
include::processors/json.asciidoc[]
|
|
include::processors/kv.asciidoc[]
|
|
include::processors/pipeline.asciidoc[]
|
|
include::processors/remove.asciidoc[]
|
|
include::processors/rename.asciidoc[]
|
|
include::processors/script.asciidoc[]
|
|
include::processors/set.asciidoc[]
|
|
include::processors/set-security-user.asciidoc[]
|
|
include::processors/split.asciidoc[]
|
|
include::processors/sort.asciidoc[]
|
|
include::processors/trim.asciidoc[]
|
|
include::processors/uppercase.asciidoc[]
|
|
include::processors/url-decode.asciidoc[]
|