[DOC] Add ingest processors documentation (#4299)

Created new documentation to close content gaps

Signed-off-by: Melissa Vagi <vagimeli@amazon.com>
This commit is contained in:
Melissa Vagi 2023-08-29 14:07:34 -06:00 committed by GitHub
parent de5f5ae073
commit 71935cff99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1648 additions and 287 deletions

View File

@ -0,0 +1,100 @@
---
layout: default
title: Create pipeline
parent: Ingest pipelines
grand_parent: Ingest APIs
nav_order: 10
redirect_from:
- /opensearch/rest-api/ingest-apis/create-update-ingest/
---
# Create pipeline
Use the create pipeline API operation to create or update pipelines in OpenSearch. Note that the pipeline requires you to define at least one processor that specifies how to change the documents.
## Path and HTTP method
Replace `<pipeline-id>` with your pipeline ID:
```json
PUT _ingest/pipeline/<pipeline-id>
```
#### Example request
Here is an example in JSON format that creates an ingest pipeline with two `set` processors and an `uppercase` processor. The first `set` processor sets the `grad_year` to `2023`, and the second `set` processor sets `graduated` to `true`. The `uppercase` processor converts the `name` field to uppercase.
```json
PUT _ingest/pipeline/my-pipeline
{
"description": "This pipeline processes student data",
"processors": [
{
"set": {
"description": "Sets the graduation year to 2023",
"field": "grad_year",
"value": 2023
}
},
{
"set": {
"description": "Sets graduated to true",
"field": "graduated",
"value": true
}
},
{
"uppercase": {
"field": "name"
}
}
]
}
```
{% include copy-curl.html %}
To learn more about error handling, see [Handling pipeline failures]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/pipeline-failures/).
## Request body fields
The following table lists the request body fields used to create or update a pipeline.
Parameter | Required | Type | Description
:--- | :--- | :--- | :---
`processors` | Required | Array of processor objects | An array of processors, each of which transforms documents. Processors are run sequentially in the order specified.
`description` | Optional | String | A description of your ingest pipeline.
## Path parameters
Parameter | Required | Type | Description
:--- | :--- | :--- | :---
`pipeline-id` | Required | String | The unique identifier, or pipeline ID, assigned to the ingest pipeline.
## Query parameters
Parameter | Required | Type | Description
:--- | :--- | :--- | :---
`cluster_manager_timeout` | Optional | Time | Period to wait for a connection to the cluster manager node. Defaults to 30 seconds.
`timeout` | Optional | Time | Period to wait for a response. Defaults to 30 seconds.
## Template snippets
Some processor parameters support [Mustache](https://mustache.github.io/) template snippets. To get the value of a field, surround the field name in three curly braces, for example, `{% raw %}{{{field-name}}}{% endraw %}`.
#### Example: `set` ingest processor using Mustache template snippet
The following example sets the field `{% raw %}{{{role}}}{% endraw %}` with a value `{% raw %}{{{tenure}}}{% endraw %}`:
```json
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "{% raw %}{{{role}}}{% endraw %}",
"value": "{% raw %}{{{tenure}}}{% endraw %}"
}
}
]
}
```
{% include copy-curl.html %}

View File

@ -1,79 +0,0 @@
---
layout: default
title: Create or update ingest pipeline
parent: Ingest APIs
nav_order: 11
redirect_from:
- /opensearch/rest-api/ingest-apis/create-update-ingest/
---
# Create and update a pipeline
The create ingest pipeline API operation creates or updates an ingest pipeline. Each pipeline requires an ingest definition defining how each processor transforms your documents.
## Example
```
PUT _ingest/pipeline/12345
{
"description" : "A description for your pipeline",
"processors" : [
{
"set" : {
"field": "field-name",
"value": "value"
}
}
]
}
```
{% include copy-curl.html %}
## Path and HTTP methods
```
PUT _ingest/pipeline/{id}
```
## Request body fields
Field | Required | Type | Description
:--- | :--- | :--- | :---
description | Optional | string | Description of your ingest pipeline.
processors | Required | Array of processor objects | A processor that transforms documents. Runs in the order specified. Appears in index once ran.
```json
{
"description" : "A description for your pipeline",
"processors" : [
{
"set" : {
"field": "field-name",
"value": "value"
}
}
]
}
```
## URL parameters
All URL parameters are optional.
Parameter | Type | Description
:--- | :--- | :---
master_timeout | time | How long to wait for a connection to the master node.
timeout | time | How long to wait for the request to return.
## Response
```json
{
"acknowledged" : true
}
```

View File

@ -1,44 +1,27 @@
---
layout: default
title: Delete a pipeline
parent: Ingest APIs
nav_order: 14
title: Delete pipeline
parent: Ingest pipelines
grand_parent: Ingest APIs
nav_order: 13
redirect_from:
- /opensearch/rest-api/ingest-apis/delete-ingest/
---
# Delete a pipeline
# Delete pipeline
If you no longer want to use an ingest pipeline, use the delete ingest pipeline API operation.
Use the following request to delete a pipeline.
## Example
To delete a specific pipeline, pass the pipeline ID as a parameter:
```
DELETE _ingest/pipeline/12345
```json
DELETE /_ingest/pipeline/<pipeline-id>
```
{% include copy-curl.html %}
## Path and HTTP methods
Delete an ingest pipeline based on that pipeline's ID.
```
DELETE _ingest/pipeline/
```
## URL parameters
All URL parameters are optional.
Parameter | Type | Description
:--- | :--- | :---
master_timeout | time | How long to wait for a connection to the master node.
timeout | time | How long to wait for the request to return.
## Response
To delete all pipelines in a cluster, use the wildcard character (`*`):
```json
{
"acknowledged" : true
}
DELETE /_ingest/pipeline/*
```
{% include copy-curl.html %}

View File

@ -1,56 +1,59 @@
---
layout: default
title: Get ingest pipeline
parent: Ingest APIs
nav_order: 10
title: Get pipeline
parent: Ingest pipelines
grand_parent: Ingest APIs
nav_order: 12
redirect_from:
- /opensearch/rest-api/ingest-apis/get-ingest/
---
## Get ingest pipeline
# Get pipeline
After you create a pipeline, use the get ingest pipeline API operation to return all the information about a specific ingest pipeline.
Use the get ingest pipeline API operation to retrieve all the information about the pipeline.
## Example
## Retrieving information about all pipelines
```
GET _ingest/pipeline/12345
The following example request returns information about all ingest pipelines:
```json
GET _ingest/pipeline/
```
{% include copy-curl.html %}
## Path and HTTP methods
## Retrieving information about a specific pipeline
Return all ingest pipelines.
The following example request returns information about a specific pipeline, which for this example is `my-pipeline`:
```json
GET _ingest/pipeline/my-pipeline
```
GET _ingest/pipeline
```
{% include copy-curl.html %}
Returns a single ingest pipeline based on the pipeline's ID.
```
GET _ingest/pipeline/{id}
```
## URL parameters
All parameters are optional.
Parameter | Type | Description
:--- | :--- | :---
master_timeout | time | How long to wait for a connection to the master node.
## Response
The response contains the pipeline information:
```json
{
"pipeline-id" : {
"description" : "A description for your pipeline",
"processors" : [
"my-pipeline": {
"description": "This pipeline processes student data",
"processors": [
{
"set" : {
"field" : "field-name",
"value" : "value"
"set": {
"description": "Sets the graduation year to 2023",
"field": "grad_year",
"value": 2023
}
},
{
"set": {
"description": "Sets graduated to true",
"field": "graduated",
"value": true
}
},
{
"uppercase": {
"field": "name"
}
}
]

View File

@ -9,6 +9,13 @@ redirect_from:
# Ingest APIs
Before you index your data, OpenSearch's ingest APIs help transform your data by creating and managing ingest pipelines. Pipelines consist of **processors**, customizable tasks that run in the order they appear in the request body. The transformed data appears in your index after each of the processor completes.
Ingest APIs are a valuable tool for loading data into a system. Ingest APIs work together with [ingest pipelines]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/ingest-pipelines/) and [ingest processors]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/ingest-processors/) to process or transform data from a variety of sources and in a variety of formats.
Ingest pipelines in OpenSearch can only be managed using ingest API operations. When using ingest in production environments, your cluster should contain at least one node with the node roles permission set to `ingest`. For more information on setting up node roles within a cluster, see [Cluster Formation]({{site.url}}{{site.baseurl}}/opensearch/cluster/).
## Ingest pipeline APIs
Simplify, secure, and scale your OpenSearch data ingestion with the following APIs:
- [Create pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/create-ingest/): Use this API to create or update a pipeline configuration.
- [Get pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/get-ingest/): Use this API to retrieve a pipeline configuration.
- [Simulate pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/simulate-ingest/): Use this pipeline to test a pipeline configuration.
- [Delete pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/delete-ingest/): Use this API to delete a pipeline configuration.

View File

@ -0,0 +1,50 @@
---
layout: default
title: Ingest pipelines
parent: Ingest APIs
has_children: true
nav_order: 5
---
# Ingest pipelines
An _ingest pipeline_ is a sequence of _processors_ that are applied to documents as they are ingested into an index. Each [processor]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/ingest-processors/) in a pipeline performs a specific task, such as filtering, transforming, or enriching data.
Processors are customizable tasks that run in a sequential order as they appear in the request body. This order is important, as each processor depends on the output of the previous processor. The modified documents appear in your index after the processors are applied.
Ingest pipelines can only be managed using [ingest API operations]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/index/).
{: .note}
## Prerequisites
The following are prerequisites for using OpenSearch ingest pipelines:
- When using ingestion in a production environment, your cluster should contain at least one node with the node roles permission set to `ingest`. For information about setting up node roles within a cluster, see [Cluster Formation]({{site.url}}{{site.baseurl}}/opensearch/cluster/).
- If the OpenSearch Security plugin is enabled, you must have the `cluster_manage_pipelines` permission to manage ingest pipelines.
## Define a pipeline
A _pipeline definition_ describes the sequence of an ingest pipeline and can be written in JSON format. An ingest pipeline consists of the following:
```json
{
"description" : "..."
"processors" : [...]
}
```
### Request body fields
Field | Required | Type | Description
:--- | :--- | :--- | :---
`processors` | Required | Array of processor objects | A component that performs a specific data processing task as the data is being ingested into OpenSearch.
`description` | Optional | String | A description of the ingest pipeline.
## Next steps
Learn how to:
- [Create a pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/create-ingest/).
- [Test a pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/simulate-ingest/).
- [Retrieve information about a pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/get-ingest/).
- [Delete a pipeline]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/delete-ingest/).

View File

@ -0,0 +1,23 @@
---
layout: default
title: Ingest processors
parent: Ingest APIs
nav_order: 10
has_children: true
---
# Ingest processors
Ingest processors are a core component of [ingest pipelines]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/ingest-pipelines/) because they preprocess documents before indexing. For example, you can remove fields, extract values from text, convert data formats, or append additional information.
OpenSearch provides a standard set of ingest processors within your OpenSearch installation. For a list of processors available in OpenSearch, use the [Nodes Info]({{site.url}}{{site.baseurl}}/api-reference/nodes-apis/nodes-info/) API operation:
```json
GET /_nodes/ingest?filter_path=nodes.*.ingest.processors
```
{% include copy-curl.html %}
To set up and deploy ingest processors, make sure you have the necessary permissions and access rights. See [Security plugin REST API]({{site.url}}{{site.baseurl}}/security/access-control/api/) to learn more.
{:.note}
Processor types and their required or optional parameters vary depending on your specific use case. See the [Ingest processors]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/ingest-processors/) section to learn more about the processor types and defining and configuring them within a pipeline.

View File

@ -0,0 +1,134 @@
---
layout: default
title: Handling pipeline failures
parent: Ingest pipelines
grand_parent: Ingest APIs
nav_order: 15
---
# Handling pipeline failures
Each ingest pipeline consists of a series of processors that are applied to the documents in sequence. If a processor fails, the entire pipeline will fail. You have two options for handling failures:
- **Fail the entire pipeline:** If a processor fails, the entire pipeline will fail and the document will not be indexed.
- **Fail the current processor and continue with the next processor:** This can be useful if you want to continue processing the document even if one of the processors fails.
By default, an ingest pipeline stops if one of its processors fails. If you want the pipeline to continue running when a processor fails, you can set the `ignore_failure` parameter for that processor to `true` when creating the pipeline:
```json
PUT _ingest/pipeline/my-pipeline/
{
"description": "Rename 'provider' field to 'cloud.provider'",
"processors": [
{
"rename": {
"field": "provider",
"target_field": "cloud.provider",
"ignore_failure": true
}
}
]
}
```
{% include copy-curl.html %}
You can specify the `on_failure` parameter to run immediately after a processor fails. If you have specified `on_failure`, OpenSearch will run the other processors in the pipeline even if the `on_failure` configuration is empty:
```json
PUT _ingest/pipeline/my-pipeline/
{
"description": "Add timestamp to the document",
"processors": [
{
"date": {
"field": "timestamp_field",
"formats": ["yyyy-MM-dd HH:mm:ss"],
"target_field": "@timestamp",
"on_failure": [
{
"set": {
"field": "ingest_error",
"value": "failed"
}
}
]
}
}
]
}
```
{% include copy-curl.html %}
If the processor fails, OpenSearch logs the failure and continues to run all remaining processors in the search pipeline. To check whether there were any failures, you can use [ingest pipeline metrics]({{site.url}}{{site.baseurl}}/api-reference/ingest-apis/pipeline-failures/#ingest-pipeline-metrics).
{: tip}
## Ingest pipeline metrics
To view ingest pipeline metrics, use the [Nodes Stats API]({{site.url}}{{site.baseurl}}/api-reference/nodes-apis/nodes-stats/):
```json
GET /_nodes/stats/ingest?filter_path=nodes.*.ingest
```
{% include copy-curl.html %}
The response contains statistics for all ingest pipelines, for example:
```json
{
"nodes": {
"iFPgpdjPQ-uzTdyPLwQVnQ": {
"ingest": {
"total": {
"count": 28,
"time_in_millis": 82,
"current": 0,
"failed": 9
},
"pipelines": {
"user-behavior": {
"count": 5,
"time_in_millis": 0,
"current": 0,
"failed": 0,
"processors": [
{
"append": {
"type": "append",
"stats": {
"count": 5,
"time_in_millis": 0,
"current": 0,
"failed": 0
}
}
}
]
},
"remove_ip": {
"count": 5,
"time_in_millis": 9,
"current": 0,
"failed": 2,
"processors": [
{
"remove": {
"type": "remove",
"stats": {
"count": 5,
"time_in_millis": 8,
"current": 0,
"failed": 2
}
}
}
]
}
}
}
}
}
}
```
**Troubleshooting ingest pipeline failures:** The first thing you should do is check the logs to see whether there are any errors or warnings that can help you identify the cause of the failure. OpenSearch logs contain information about the ingest pipeline that failed, including the processor that failed and the reason for the failure.
{: .tip}

View File

@ -0,0 +1,147 @@
---
layout: default
title: Append
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 10
---
# Append
The `append` processor is used to add values to a field:
- If the field is an array, the `append` processor appends the specified values to that array.
- If the field is a scalar field, the `append` processor converts it to an array and appends the specified values to that array.
- If the field does not exist, the `append` processor creates an array with the specified values.
The following is the syntax for the `append` processor:
```json
{
"append": {
"field": "your_target_field",
"value": ["your_appended_value"]
}
}
```
{% include copy-curl.html %}
## Configuration parameters
The following table lists the required and optional parameters for the `append` processor.
Parameter | Required | Description |
|-----------|-----------|-----------|
`field` | Required | The name of the field to which the data should be appended. Supports template snippets.|
`value` | Required | The value to be appended. This can be a static value or a dynamic value derived from existing fields. Supports template snippets. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `user-behavior`, that has one append processor. It appends the `page_view` of each new document ingested into OpenSearch to an array field named `event_types`:
```json
PUT _ingest/pipeline/user-behavior
{
"description": "Pipeline that appends event type",
"processors": [
{
"append": {
"field": "event_types",
"value": ["page_view"]
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/user-behavior/_simulate
{
"docs":[
{
"_source":{
}
}
]
}
```
{% include copy-curl.html %}
#### Reponse
The following response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"event_types": [
"page_view"
]
},
"_ingest": {
"timestamp": "2023-08-28T16:55:10.621805166Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=user-behavior
{
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}
Because the document does not contain an `event_types` field, an array field is created and the event is appended to the array:
```json
{
"_index": "testindex1",
"_id": "1",
"_version": 2,
"_seq_no": 1,
"_primary_term": 1,
"found": true,
"_source": {
"event_types": [
"page_view"
]
}
}
```

View File

@ -0,0 +1,134 @@
---
layout: default
title: Bytes
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 20
---
# Bytes
The `bytes` processor converts a human-readable byte value to its equivalent value in bytes. The field can be a scalar or an array. If the field is a scalar, the value is converted and stored in the field. If the field is an array, all values of the array are converted.
The following is the syntax for the `bytes` processor:
```json
{
"bytes": {
"field": "your_field_name"
}
}
```
{% include copy-curl.html %}
## Configuration parameters
The following table lists the required and optional parameters for the `bytes` processor.
Parameter | Required | Description |
|-----------|-----------|-----------|
`field` | Required | The name of the field where the data should be converted. Supports template snippets. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`ignore_missing` | Optional | If set to `true`, the processor does not modify the document if the field does not exist or is `null`. Default is `false`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
`target_field` | Optional | The name of the field in which to store the parsed data. If not specified, the value will be stored in place in the `field` field. Default is `field`. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `file_upload`, that has one `bytes` processor. It converts the `file_size` to its byte equivalent and stores it in a new field named `file_size_bytes`:
```json
PUT _ingest/pipeline/file_upload
{
"description": "Pipeline that converts file size to bytes",
"processors": [
{
"bytes": {
"field": "file_size",
"target_field": "file_size_bytes"
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/file_upload/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source": {
"file_size_bytes": "10485760",
"file_size":
"10MB"
}
}
]
}
```
{% include copy-curl.html %}
#### Reponse
The following response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"event_types": [
"event_type"
],
"file_size_bytes": "10485760",
"file_size": "10MB"
},
"_ingest": {
"timestamp": "2023-08-22T16:09:42.771569211Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=file_upload
{
"file_size": "10MB"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -0,0 +1,137 @@
---
layout: default
title: Convert
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 30
---
# Convert
The `convert` processor converts a field in a document to a different type, for example, a string to an integer or an integer to a string. For an array field, all values in the array are converted. The following is the syntax for the `convert` processor:
```json
{
"convert": {
"field": "field_name",
"type": "type-value"
}
}
```
{% include copy-curl.html %}
## Configuration parameters
The following table lists the required and optional parameters for the `convert` processor.
Parameter | Required | Description |
|-----------|-----------|-----------|
`field` | Required | The name of the field that contains the data to be converted. Supports template snippets. |
`type` | Required | The type to convert the field value to. The supported types are `integer`, `long`, `float`, `double`, `string`, `boolean`, `ip`, and `auto`. If the `type` is `boolean`, the value is set to `true` if the field value is a string `true` (ignoring case) and to `false` if the field value is a string `false` (ignoring case). If the value is not one of the allowed values, an error will occur. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`ignore_missing` | Optional | If set to `true`, the processor does not modify the document if the field does not exist or is `null`. Default is `false`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
`target_field` | Optional | The name of the field in which to store the parsed data. If not specified, the value will be stored in the `field` field. Default is `field`. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `convert-price`, that converts `price` to a floating-point number, stores the converted value in the `price_float` field, and sets the value to `0` if it is less than `0`:
```json
PUT _ingest/pipeline/convert-price
{
"description": "Pipeline that converts price to floating-point number and sets value to zero if price less than zero",
"processors": [
{
"convert": {
"field": "price",
"type": "float",
"target_field": "price_float"
}
},
{
"set": {
"field": "price",
"value": "0",
"if": "ctx.price_float < 0"
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/convert-price/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source": {
"price": "-10.5"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The following example response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"price_float": -10.5,
"price": "0"
},
"_ingest": {
"timestamp": "2023-08-22T15:38:21.180688799Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=convert-price
{
"price": "10.5"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -0,0 +1,138 @@
---
layout: default
title: CSV
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 40
---
# CSV
The `csv` processor is used to parse CSVs and store them as individual fields in a document. The processor ignores empty fields. The following is the syntax for the `csv` processor:
```json
{
"csv": {
"field": "field_name",
"target_fields": ["field1, field2, ..."]
}
}
```
{% include copy-curl.html %}
## Configuration parameters
The following table lists the required and optional parameters for the `csv` processor.
Parameter | Required | Description |
|-----------|-----------|-----------|
`field` | Required | The name of the field that contains the data to be converted. Supports template snippets. |
`target_fields` | Required | The name of the field in which to store the parsed data. |
`description` | Optional | A brief description of the processor. |
`empty_value` | Optional | Represents optional parameters that are not required or are not applicable. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`ignore_missing` | Optional | If set to `true`, the processor will not fail if the field does not exist. Default is `true`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`quote` | Optional | The character used to quote fields in the CSV data. Default is `"`. |
`separator` | Optional | The delimiter used to separate the fields in the CSV data. Default is `,`. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
`trim` | Optional | If set to `true`, the processor trims white space from the beginning and end of the text. Default is `false`. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `csv-processor`, that splits `resource_usage` into three new fields named `cpu_usage`, `memory_usage`, and `disk_usage`:
```json
PUT _ingest/pipeline/csv-processor
{
"description": "Split resource usage into individual fields",
"processors": [
{
"csv": {
"field": "resource_usage",
"target_fields": ["cpu_usage", "memory_usage", "disk_usage"],
"separator": ","
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/csv-processor/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source": {
"resource_usage": "25,4096,10",
"memory_usage": "4096",
"disk_usage": "10",
"cpu_usage": "25"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The following example response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"memory_usage": "4096",
"disk_usage": "10",
"resource_usage": "25,4096,10",
"cpu_usage": "25"
},
"_ingest": {
"timestamp": "2023-08-22T16:40:45.024796379Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=csv-processor
{
"resource_usage": "25,4096,10"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -0,0 +1,135 @@
---
layout: default
title: Date
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 50
---
# Date
The `date` processor is used to parse dates from document fields and to add the parsed data to a new field. By default, the parsed data is stored in the `@timestamp` field. The following is the syntax for the `date` processor:
```json
{
"date": {
"field": "date_field",
"formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZZ"]
}
}
```
{% include copy-curl.html %}
## Configuration parameters
The following table lists the required and optional parameters for the `date` processor.
Parameter | Required | Description |
|-----------|-----------|-----------|
`field` | Required | The name of the field to which the data should be converted. Supports template snippets. |
`formats` | Required | An array of the expected date formats. Can be a [date format]({{site.url}}{{site.baseurl}}/field-types/supported-field-types/date/#formats) or one of the following formats: ISO8601, UNIX, UNIX_MS, or TAI64N. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`locale` | Optional | The locale to use when parsing the date. Default is `ENGLISH`. Supports template snippets. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`output_format` | Optional | The [date format]({{site.url}}{{site.baseurl}}/field-types/supported-field-types/date/#formats) to use for the target field. Default is `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
`target_field` | Optional | The name of the field in which to store the parsed data. Default target field is `@timestamp`. |
`timezone` | Optional | The time zone to use when parsing the date. Default is `UTC`. Supports template snippets. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `date-output-format`, that uses the `date` processor to convert from European date format to US date format, adding the new field `date_us` with the desired `output_format`:
```json
PUT /_ingest/pipeline/date-output-format
{
"description": "Pipeline that converts European date format to US date format",
"processors": [
{
"date": {
"field" : "date_european",
"formats" : ["dd/MM/yyyy", "UNIX"],
"target_field": "date_us",
"output_format": "MM/dd/yyy",
"timezone" : "UTC"
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/date-output-format/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source": {
"date_us": "06/30/2023",
"date_european": "30/06/2023"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The following example response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"date_us": "06/30/2023",
"date_european": "30/06/2023"
},
"_ingest": {
"timestamp": "2023-08-22T17:08:46.275195504Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=date-output-format
{
"date_european": "30/06/2023"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -0,0 +1,125 @@
---
layout: default
title: Lowercase
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 210
---
# Lowercase
The `lowercase` processor converts all the text in a specific field to lowercase letters. The following is the syntax for the `lowercase` processor:
```json
{
"lowercase": {
"field": "field_name"
}
}
```
{% include copy-curl.html %}
#### Configuration parameters
The following table lists the required and optional parameters for the `lowercase` processor.
| Name | Required | Description |
|---|---|---|
`field` | Required | The name of the field that contains the data to be converted. Supports template snippets. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`ignore_missing` | Optional | Specifies whether the processor should ignore documents that do not have the specified field. Default is `false`. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
`target_field` | Optional | The name of the field in which to store the parsed data. Default is `field`. By default, `field` is updated in place. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `lowercase-title`, that uses the `lowercase` processor to lowercase the `title` field of a document:
```json
PUT _ingest/pipeline/lowercase-title
{
"description" : "Pipeline that lowercases the title field",
"processors" : [
{
"lowercase" : {
"field" : "title"
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/lowercase-title/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source": {
"title": "WAR AND PEACE"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The following example response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"title": "war and peace"
},
"_ingest": {
"timestamp": "2023-08-22T17:39:39.872671834Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=lowercase-title
{
"title": "WAR AND PEACE"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -0,0 +1,125 @@
---
layout: default
title: Remove
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 230
---
# Remove
The `remove` processor is used to remove a field from a document. The following is the syntax for the `remove` processor:
```json
{
"remove": {
"field": "field_name"
}
}
```
{% include copy-curl.html %}
#### Configuration parameters
The following table lists the required and optional parameters for the `remove` processor.
| Name | Required | Description |
|---|---|---|
`field` | Required | The name of the field to which the data should be appended. Supports template snippets. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `remove_ip`, that removes the `ip_address` field from a document:
```json
PUT /_ingest/pipeline/remove_ip
{
"description": "Pipeline that excludes the ip_address field.",
"processors": [
{
"remove": {
"field": "ip_address"
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/remove_ip/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source":{
"ip_address": "203.0.113.1",
"name": "John Doe"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The following example response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"name": "John Doe"
},
"_ingest": {
"timestamp": "2023-08-24T18:02:13.218986756Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PPUT testindex1/_doc/1?pipeline=remove_ip
{
"ip_address": "203.0.113.1",
"name": "John Doe"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -0,0 +1,125 @@
---
layout: default
title: Uppercase
parent: Ingest processors
grand_parent: Ingest APIs
nav_order: 310
---
# Uppercase
The `uppercase` processor converts all the text in a specific field to uppercase letters. The following is the syntax for the `uppercase` processor:
```json
{
"uppercase": {
"field": "field_name"
}
}
```
{% include copy-curl.html %}
#### Configuration parameters
The following table lists the required and optional parameters for the `uppercase` processor.
| Name | Required | Description |
|---|---|---|
`field` | Required | The name of the field to which the data should be appended. Supports template snippets. |
`description` | Optional | A brief description of the processor. |
`if` | Optional | A condition for running this processor. |
`ignore_failure` | Optional | If set to `true`, failures are ignored. Default is `false`. |
`ignore_missing` | Optional | Specifies whether the processor should ignore documents that do not have the specified field. Default is `false`. |
`on_failure` | Optional | A list of processors to run if the processor fails. |
`tag` | Optional | An identifier tag for the processor. Useful for debugging to distinguish between processors of the same type. |
`target_field` | Optional | The name of the field in which to store the parsed data. Default is `field`. By default, `field` is updated in place. |
## Using the processor
Follow these steps to use the processor in a pipeline.
**Step 1: Create a pipeline.**
The following query creates a pipeline, named `uppercase`, that converts the text in the `field` field to uppercase:
```json
PUT _ingest/pipeline/uppercase
{
"processors": [
{
"uppercase": {
"field": "name"
}
}
]
}
```
{% include copy-curl.html %}
**Step 2 (Optional): Test the pipeline.**
It is recommended that you test your pipeline before you ingest documents.
{: .tip}
To test the pipeline, run the following query:
```json
POST _ingest/pipeline/uppercase/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source": {
"name": "John"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The following example response confirms that the pipeline is working as expected:
```json
{
"docs": [
{
"doc": {
"_index": "testindex1",
"_id": "1",
"_source": {
"name": "JOHN"
},
"_ingest": {
"timestamp": "2023-08-28T19:54:42.289624792Z"
}
}
}
]
}
```
**Step 3: Ingest a document.**
The following query ingests a document into an index named `testindex1`:
```json
PUT testindex1/_doc/1?pipeline=uppercase
{
"name": "John"
}
```
{% include copy-curl.html %}
**Step 4 (Optional): Retrieve the document.**
To retrieve the document, run the following query:
```json
GET testindex1/_doc/1
```
{% include copy-curl.html %}

View File

@ -1,34 +1,82 @@
---
layout: default
title: Simulate an ingest pipeline
parent: Ingest APIs
nav_order: 13
title: Simulate pipeline
parent: Ingest pipelines
grand_parent: Ingest APIs
nav_order: 11
redirect_from:
- /opensearch/rest-api/ingest-apis/simulate-ingest/
---
# Simulate a pipeline
# Simulate pipeline
Simulates an ingest pipeline with any example documents you specify.
Use the simulate ingest pipeline API operation to run or test the pipeline.
## Example
## Path and HTTP methods
The following requests **simulate the latest ingest pipeline created**:
```
POST /_ingest/pipeline/35678/_simulate
GET _ingest/pipeline/_simulate
POST _ingest/pipeline/_simulate
```
{% include copy-curl.html %}
The following requests **simulate a single pipeline based on the pipeline ID**:
```
GET _ingest/pipeline/<pipeline-id>/_simulate
POST _ingest/pipeline/<pipeline-id>/_simulate
```
{% include copy-curl.html %}
## Request body fields
The following table lists the request body fields used to run a pipeline.
Field | Required | Type | Description
:--- | :--- | :--- | :---
`docs` | Required | Array | The documents to be used to test the pipeline.
`pipeline` | Optional | Object | The pipeline to be simulated. If the pipeline identifier is not included, then the response simulates the latest pipeline created.
The `docs` field can include subfields listed in the following table.
Field | Required | Type | Description
:--- | :--- | :--- | :---
`source` | Required | Object | The document's JSON body.
`id` | Optional | String | A unique document identifier. The identifier cannot be used elsewhere in the index.
`index` | Optional | String | The index where the document's transformed data appears.
## Query parameters
The following table lists the query parameters for running a pipeline.
Parameter | Type | Description
:--- | :--- | :---
`verbose` | Boolean | Verbose mode. Display data output for each processor in the executed pipeline.
#### Example: Specify a pipeline in the path
```json
POST /_ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_index": "my-index",
"_id": "1",
"_source": {
"location": "document-name"
"grad_year": 2024,
"graduated": false,
"name": "John Doe"
}
},
{
"_index": "index",
"_id": "id",
"_index": "my-index",
"_id": "2",
"_source": {
"location": "document-name"
"grad_year": 2025,
"graduated": false,
"name": "Jane Doe"
}
}
]
@ -36,77 +84,36 @@ POST /_ingest/pipeline/35678/_simulate
```
{% include copy-curl.html %}
## Path and HTTP methods
Simulate the last ingest pipeline created.
```
GET _ingest/pipeline/_simulate
POST _ingest/pipeline/_simulate
```
Simulate a single pipeline based on the pipeline's ID.
```
GET _ingest/pipeline/{id}/_simulate
POST _ingest/pipeline/{id}/_simulate
```
## URL parameters
All URL parameters are optional.
Parameter | Type | Description
:--- | :--- | :---
verbose | boolean | Verbose mode. Display data output for each processor in executed pipeline.
## Request body fields
Field | Required | Type | Description
:--- | :--- | :--- | :---
`pipeline` | Optional | object | The pipeline you want to simulate. When included without the pipeline `{id}` inside the request path, the response simulates the last pipeline created.
`docs` | Required | array of objects | The documents you want to use to test the pipeline.
The `docs` field can include the following subfields:
Field | Required | Type | Description
:--- | :--- | :---
`id` | Optional |string | An optional identifier for the document. The identifier cannot be used elsewhere in the index.
`index` | Optional | string | The index where the document's transformed data appears.
`source` | Required | object | The document's JSON body.
## Response
Responses vary based on which path and HTTP method you choose.
### Specify pipeline in request body
The request returns the following response:
```json
{
"docs" : [
"docs": [
{
"doc" : {
"_index" : "index",
"_id" : "id",
"_source" : {
"location" : "new-new",
"field2" : "_value"
"doc": {
"_index": "my-index",
"_id": "1",
"_source": {
"name": "JOHN DOE",
"grad_year": 2023,
"graduated": true
},
"_ingest" : {
"timestamp" : "2022-02-07T18:47:57.479230835Z"
"_ingest": {
"timestamp": "2023-06-20T23:19:54.635306588Z"
}
}
},
{
"doc" : {
"_index" : "index",
"_id" : "id",
"_source" : {
"location" : "new-new",
"field2" : "_value"
"doc": {
"_index": "my-index",
"_id": "2",
"_source": {
"name": "JANE DOE",
"grad_year": 2023,
"graduated": true
},
"_ingest" : {
"timestamp" : "2022-02-07T18:47:57.47933496Z"
"_ingest": {
"timestamp": "2023-06-20T23:19:54.635746046Z"
}
}
}
@ -114,83 +121,65 @@ Responses vary based on which path and HTTP method you choose.
}
```
### Specify pipeline ID inside HTTP path
### Example: Verbose mode
When the previous request is run with the `verbose` parameter set to `true`, the response shows the sequence of transformations for each document. For example, for the document with the ID `1`, the response contains the results of applying each processor in the pipeline in sequence:
```json
{
"docs" : [
"docs": [
{
"doc" : {
"_index" : "index",
"_id" : "id",
"_source" : {
"field-name" : "value",
"location" : "document-name"
},
"_ingest" : {
"timestamp" : "2022-02-03T21:47:05.382744877Z"
}
}
},
{
"doc" : {
"_index" : "index",
"_id" : "id",
"_source" : {
"field-name" : "value",
"location" : "document-name"
},
"_ingest" : {
"timestamp" : "2022-02-03T21:47:05.382803544Z"
}
}
}
]
}
```
### Receive verbose response
With the `verbose` parameter set to `true`, the response shows how each processor transforms the specified document.
```json
{
"docs" : [
{
"processor_results" : [
"processor_results": [
{
"processor_type" : "set",
"status" : "success",
"doc" : {
"_index" : "index",
"_id" : "id",
"_source" : {
"field-name" : "value",
"location" : "document-name"
"processor_type": "set",
"status": "success",
"description": "Sets the graduation year to 2023",
"doc": {
"_index": "my-index",
"_id": "1",
"_source": {
"name": "John Doe",
"grad_year": 2023,
"graduated": false
},
"_ingest" : {
"pipeline" : "35678",
"timestamp" : "2022-02-03T21:45:09.414049004Z"
"_ingest": {
"pipeline": "my-pipeline",
"timestamp": "2023-06-20T23:23:26.656564631Z"
}
}
}
]
},
{
"processor_results" : [
},
{
"processor_type" : "set",
"status" : "success",
"doc" : {
"_index" : "index",
"_id" : "id",
"_source" : {
"field-name" : "value",
"location" : "document-name"
"processor_type": "set",
"status": "success",
"description": "Sets 'graduated' to true",
"doc": {
"_index": "my-index",
"_id": "1",
"_source": {
"name": "John Doe",
"grad_year": 2023,
"graduated": true
},
"_ingest" : {
"pipeline" : "35678",
"timestamp" : "2022-02-03T21:45:09.414093212Z"
"_ingest": {
"pipeline": "my-pipeline",
"timestamp": "2023-06-20T23:23:26.656564631Z"
}
}
},
{
"processor_type": "uppercase",
"status": "success",
"doc": {
"_index": "my-index",
"_id": "1",
"_source": {
"name": "JOHN DOE",
"grad_year": 2023,
"graduated": true
},
"_ingest": {
"pipeline": "my-pipeline",
"timestamp": "2023-06-20T23:23:26.656564631Z"
}
}
}
@ -199,3 +188,88 @@ With the `verbose` parameter set to `true`, the response shows how each processo
]
}
```
### Example: Specify a pipeline in the request body
Alternatively, you can specify a pipeline directly in the request body without first creating a pipeline:
```json
POST /_ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "Splits text on whitespace characters",
"processors": [
{
"csv" : {
"field" : "name",
"separator": ",",
"target_fields": ["last_name", "first_name"],
"trim": true
}
},
{
"uppercase": {
"field": "last_name"
}
}
]
},
"docs": [
{
"_index": "second-index",
"_id": "1",
"_source": {
"name": "Doe,John"
}
},
{
"_index": "second-index",
"_id": "2",
"_source": {
"name": "Doe, Jane"
}
}
]
}
```
{% include copy-curl.html %}
#### Response
The request returns the following response:
```json
{
"docs": [
{
"doc": {
"_index": "second-index",
"_id": "1",
"_source": {
"name": "Doe,John",
"last_name": "DOE",
"first_name": "John"
},
"_ingest": {
"timestamp": "2023-08-24T19:20:44.816219673Z"
}
}
},
{
"doc": {
"_index": "second-index",
"_id": "2",
"_source": {
"name": "Doe, Jane",
"last_name": "DOE",
"first_name": "Jane"
},
"_ingest": {
"timestamp": "2023-08-24T19:20:44.816492381Z"
}
}
}
]
}
```