ingest: Add date_index_name processor.

Closes #17814
This commit is contained in:
Martijn van Groningen 2016-04-25 23:22:32 +02:00
parent 07c2fbf83a
commit 7aca1389e2
9 changed files with 462 additions and 35 deletions

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IllformedLocaleException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
public final class DateIndexNameProcessor extends AbstractProcessor {
public static final String TYPE = "date_index_name";
private final String field;
private final String indexNamePrefix;
private final String dateRounding;
private final String indexNameFormat;
private final DateTimeZone timezone;
private final List<Function<String, DateTime>> dateFormats;
DateIndexNameProcessor(String tag, String field, List<Function<String, DateTime>> dateFormats, DateTimeZone timezone,
String indexNamePrefix, String dateRounding, String indexNameFormat) {
super(tag);
this.field = field;
this.timezone = timezone;
this.dateFormats = dateFormats;
this.indexNamePrefix = indexNamePrefix;
this.dateRounding = dateRounding;
this.indexNameFormat = indexNameFormat;
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
String date = ingestDocument.getFieldValue(field, String.class);
DateTime dateTime = null;
Exception lastException = null;
for (Function<String, DateTime> dateParser : dateFormats) {
try {
dateTime = dateParser.apply(date);
} catch (Exception e) {
//try the next parser and keep track of the exceptions
lastException = ExceptionsHelper.useOrSuppress(lastException, e);
}
}
if (dateTime == null) {
throw new IllegalArgumentException("unable to parse date [" + date + "]", lastException);
}
DateTimeFormatter formatter = DateTimeFormat.forPattern(indexNameFormat);
StringBuilder builder = new StringBuilder()
.append('<')
.append(indexNamePrefix)
.append('{')
.append(formatter.print(dateTime)).append("||/").append(dateRounding)
.append('{').append(indexNameFormat).append('|').append(timezone).append('}')
.append('}')
.append('>');
String dynamicIndexName = builder.toString();
ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName);
}
@Override
public String getType() {
return TYPE;
}
String getField() {
return field;
}
String getIndexNamePrefix() {
return indexNamePrefix;
}
String getDateRounding() {
return dateRounding;
}
String getIndexNameFormat() {
return indexNameFormat;
}
DateTimeZone getTimezone() {
return timezone;
}
List<Function<String, DateTime>> getDateFormats() {
return dateFormats;
}
public static final class Factory extends AbstractProcessorFactory<DateIndexNameProcessor> {
@Override
protected DateIndexNameProcessor doCreate(String tag, Map<String, Object> config) throws Exception {
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale");
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
Locale locale = Locale.ENGLISH;
if (localeString != null) {
try {
locale = (new Locale.Builder()).setLanguageTag(localeString).build();
} catch (IllformedLocaleException e) {
throw new IllegalArgumentException("Invalid language tag specified: " + localeString);
}
}
List<String> dateFormatStrings = ConfigurationUtils.readOptionalList(TYPE, tag, config, "date_formats");
if (dateFormatStrings == null) {
dateFormatStrings = Collections.singletonList("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
}
List<Function<String, DateTime>> dateFormats = new ArrayList<>(dateFormatStrings.size());
for (String format : dateFormatStrings) {
DateFormat dateFormat = DateFormat.fromString(format);
dateFormats.add(dateFormat.getFunction(format, timezone, locale));
}
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String indexNamePrefix = ConfigurationUtils.readStringProperty(TYPE, tag, config, "index_name_prefix", "");
String dateRounding = ConfigurationUtils.readStringProperty(TYPE, tag, config, "date_rounding");
String indexNameFormat = ConfigurationUtils.readStringProperty(TYPE, tag, config, "index_name_format", "yyyy-MM-dd");
return new DateIndexNameProcessor(tag, field, dateFormats, timezone, indexNamePrefix, dateRounding, indexNameFormat);
}
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.DateIndexNameProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.ForEachProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
@ -76,6 +77,7 @@ public class NodeModule extends AbstractModule {
registerProcessor(GsubProcessor.TYPE, (templateService, registry) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, (templateService, registry) -> new FailProcessor.Factory(templateService));
registerProcessor(ForEachProcessor.TYPE, (templateService, registry) -> new ForEachProcessor.Factory(registry));
registerProcessor(DateIndexNameProcessor.TYPE, (templateService, registry) -> new DateIndexNameProcessor.Factory());
}
@Override

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeZone;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class DateIndexNameFactoryTests extends ESTestCase {
public void testDefaults() throws Exception {
DateIndexNameProcessor.Factory factory = new DateIndexNameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("date_rounding", "y");
DateIndexNameProcessor processor = factory.create(config);
assertThat(processor.getDateFormats().size(), Matchers.equalTo(1));
assertThat(processor.getField(), Matchers.equalTo("_field"));
assertThat(processor.getIndexNamePrefix(), Matchers.equalTo(""));
assertThat(processor.getDateRounding(), Matchers.equalTo("y"));
assertThat(processor.getIndexNameFormat(), Matchers.equalTo("yyyy-MM-dd"));
assertThat(processor.getTimezone(), Matchers.equalTo(DateTimeZone.UTC));
}
public void testSpecifyOptionalSettings() throws Exception {
DateIndexNameProcessor.Factory factory = new DateIndexNameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("index_name_prefix", "_prefix");
config.put("date_rounding", "y");
config.put("date_formats", Arrays.asList("UNIX", "UNIX_MS"));
DateIndexNameProcessor processor = factory.create(config);
assertThat(processor.getDateFormats().size(), Matchers.equalTo(2));
config = new HashMap<>();
config.put("field", "_field");
config.put("index_name_prefix", "_prefix");
config.put("date_rounding", "y");
config.put("index_name_format", "yyyyMMdd");
processor = factory.create(config);
assertThat(processor.getIndexNameFormat(), Matchers.equalTo("yyyyMMdd"));
config = new HashMap<>();
config.put("field", "_field");
config.put("index_name_prefix", "_prefix");
config.put("date_rounding", "y");
config.put("timezone", "+02:00");
processor = factory.create(config);
assertThat(processor.getTimezone(), Matchers.equalTo(DateTimeZone.forOffsetHours(2)));
config = new HashMap<>();
config.put("field", "_field");
config.put("index_name_prefix", "_prefix");
config.put("date_rounding", "y");
processor = factory.create(config);
assertThat(processor.getIndexNamePrefix(), Matchers.equalTo("_prefix"));
}
public void testRequiredFields() throws Exception {
DateIndexNameProcessor.Factory factory = new DateIndexNameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("date_rounding", "y");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
config.clear();
config.put("field", "_field");
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
assertThat(e.getMessage(), Matchers.equalTo("[date_rounding] required property is missing"));
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.Collections;
import java.util.Locale;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
public class DateIndexNameProcessorTests extends ESTestCase {
public void testJodaPattern() throws Exception {
Function<String, DateTime> function = DateFormat.Joda.getFunction("yyyy-MM-dd'T'HH:mm:ss.SSSZ", DateTimeZone.UTC, Locale.ROOT);
DateIndexNameProcessor processor = new DateIndexNameProcessor(
"_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC,
"events-", "y", "yyyyMMdd"
);
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", "2016-04-25T12:24:20.101Z"));
processor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{20160425||/y{yyyyMMdd|UTC}}>"));
}
public void testTAI64N()throws Exception {
Function<String, DateTime> function = DateFormat.Tai64n.getFunction(null, DateTimeZone.UTC, null);
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024"));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{20121222||/m{yyyyMMdd|UTC}}>"));
}
public void testUnixMs()throws Exception {
Function<String, DateTime> function = DateFormat.UnixMs.getFunction(null, DateTimeZone.UTC, null);
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", "1000500"));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));
}
public void testUnix()throws Exception {
Function<String, DateTime> function = DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null);
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", "1000.5"));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));
}
}

View File

@ -739,6 +739,67 @@ Here is an example that adds the parsed date to the `timestamp` field based on t
}
--------------------------------------------------
[[date-index-name-processor]]
=== Date Index Name Processor
The purpose of this processor is to point documents to the right time based index based
on a date or timestamp field in a document by using the <<date-math-index-names, date math index name support>>.
The processor sets the `_index` meta field with a date math index name expression based on the provided index name
prefix, a date or timestamp field in the documents being processed and the provided date rounding.
First this processor fetches the date or timestamp from a field in the document being processed. Optionally
date formatting can be configured on how the field's value should be parsed into a date. Then this date,
the provided index name prefix and the provided date rounding get formatted into a date math index name expression.
Also here optionally date formatting can be specified on how the date should be formatted into a date math index name
expression.
An example pipeline that points documents to a monthly index that starts with a `myindex-` prefix based on a
date in the `date1` field:
[source,js]
--------------------------------------------------
PUT _ingest/pipeline/1
{
"processors" : [
{
"date_index_name" : {
"field" : "date1",
"index_name_prefix" : "myindex-",
"date_rounding" : "m"
}
}
]
}
--------------------------------------------------
Using that pipeline for an index request:
[source,js]
--------------------------------------------------
PUT /myindex/type/1?pipeline=1
{
"date1" : "2016-04-25T12:02:01.789Z"
}
--------------------------------------------------
The above request will not index this document into the `myindex` index, but into the `myindex-2016-04-01` index.
This is because the date is being rounded by month.
[[date-index-name-options]]
.Date index name options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to get the date or timestamp from.
| `index_name_prefix` | no | - | A prefix of the index name to be prepended before the printed date.
| `date_rounding` | yes | - | How to round the date when formatting the date into the index name. Valid values are: `y` (year), `m` (month), `w` (week), `d` (day), `h` (hour), `m` (minute) and `s` (second).
| `date_formats ` | no | yyyy-MM-dd'T'HH:mm:ss.SSSZ | An array of the expected date formats for parsing dates / timestamps in the document being preprocessed. Can be a Joda pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, or TAI64N.
| `timezone` | no | UTC | The timezone to use when parsing the date and when date math index supports resolves expressions into concrete index names.
| `locale` | no | ENGLISH | The locale to use when parsing the date from the document being preprocessed, relevant when parsing month names or week days.
| `index_name_format` | no | yyyy-MM-dd | The format to be used when printing the parsed date into the index name. An valid Joda pattern is expected here.
|======
[[fail-processor]]
=== Fail Processor
Raises an exception. This is useful for when

View File

@ -12,15 +12,16 @@
- match: { nodes.$master.ingest.processors.0.type: append }
- match: { nodes.$master.ingest.processors.1.type: convert }
- match: { nodes.$master.ingest.processors.2.type: date }
- match: { nodes.$master.ingest.processors.3.type: fail }
- match: { nodes.$master.ingest.processors.4.type: foreach }
- match: { nodes.$master.ingest.processors.5.type: grok }
- match: { nodes.$master.ingest.processors.6.type: gsub }
- match: { nodes.$master.ingest.processors.7.type: join }
- match: { nodes.$master.ingest.processors.8.type: lowercase }
- match: { nodes.$master.ingest.processors.9.type: remove }
- match: { nodes.$master.ingest.processors.10.type: rename }
- match: { nodes.$master.ingest.processors.11.type: set }
- match: { nodes.$master.ingest.processors.12.type: split }
- match: { nodes.$master.ingest.processors.13.type: trim }
- match: { nodes.$master.ingest.processors.14.type: uppercase }
- match: { nodes.$master.ingest.processors.3.type: date_index_name }
- match: { nodes.$master.ingest.processors.4.type: fail }
- match: { nodes.$master.ingest.processors.5.type: foreach }
- match: { nodes.$master.ingest.processors.6.type: grok }
- match: { nodes.$master.ingest.processors.7.type: gsub }
- match: { nodes.$master.ingest.processors.8.type: join }
- match: { nodes.$master.ingest.processors.9.type: lowercase }
- match: { nodes.$master.ingest.processors.10.type: remove }
- match: { nodes.$master.ingest.processors.11.type: rename }
- match: { nodes.$master.ingest.processors.12.type: set }
- match: { nodes.$master.ingest.processors.13.type: split }
- match: { nodes.$master.ingest.processors.14.type: trim }
- match: { nodes.$master.ingest.processors.15.type: uppercase }

View File

@ -12,15 +12,16 @@
- match: { nodes.$master.ingest.processors.1.type: attachment }
- match: { nodes.$master.ingest.processors.2.type: convert }
- match: { nodes.$master.ingest.processors.3.type: date }
- match: { nodes.$master.ingest.processors.4.type: fail }
- match: { nodes.$master.ingest.processors.5.type: foreach }
- match: { nodes.$master.ingest.processors.6.type: gsub }
- match: { nodes.$master.ingest.processors.7.type: join }
- match: { nodes.$master.ingest.processors.8.type: lowercase }
- match: { nodes.$master.ingest.processors.9.type: remove }
- match: { nodes.$master.ingest.processors.10.type: rename }
- match: { nodes.$master.ingest.processors.11.type: set }
- match: { nodes.$master.ingest.processors.12.type: split }
- match: { nodes.$master.ingest.processors.13.type: trim }
- match: { nodes.$master.ingest.processors.14.type: uppercase }
- match: { nodes.$master.ingest.processors.4.type: date_index_name }
- match: { nodes.$master.ingest.processors.5.type: fail }
- match: { nodes.$master.ingest.processors.6.type: foreach }
- match: { nodes.$master.ingest.processors.7.type: gsub }
- match: { nodes.$master.ingest.processors.8.type: join }
- match: { nodes.$master.ingest.processors.9.type: lowercase }
- match: { nodes.$master.ingest.processors.10.type: remove }
- match: { nodes.$master.ingest.processors.11.type: rename }
- match: { nodes.$master.ingest.processors.12.type: set }
- match: { nodes.$master.ingest.processors.13.type: split }
- match: { nodes.$master.ingest.processors.14.type: trim }
- match: { nodes.$master.ingest.processors.15.type: uppercase }

View File

@ -11,15 +11,16 @@
- match: { nodes.$master.ingest.processors.0.type: append }
- match: { nodes.$master.ingest.processors.1.type: convert }
- match: { nodes.$master.ingest.processors.2.type: date }
- match: { nodes.$master.ingest.processors.3.type: fail }
- match: { nodes.$master.ingest.processors.4.type: foreach }
- match: { nodes.$master.ingest.processors.5.type: geoip }
- match: { nodes.$master.ingest.processors.6.type: gsub }
- match: { nodes.$master.ingest.processors.7.type: join }
- match: { nodes.$master.ingest.processors.8.type: lowercase }
- match: { nodes.$master.ingest.processors.9.type: remove }
- match: { nodes.$master.ingest.processors.10.type: rename }
- match: { nodes.$master.ingest.processors.11.type: set }
- match: { nodes.$master.ingest.processors.12.type: split }
- match: { nodes.$master.ingest.processors.13.type: trim }
- match: { nodes.$master.ingest.processors.14.type: uppercase }
- match: { nodes.$master.ingest.processors.3.type: date_index_name }
- match: { nodes.$master.ingest.processors.4.type: fail }
- match: { nodes.$master.ingest.processors.5.type: foreach }
- match: { nodes.$master.ingest.processors.6.type: geoip }
- match: { nodes.$master.ingest.processors.7.type: gsub }
- match: { nodes.$master.ingest.processors.8.type: join }
- match: { nodes.$master.ingest.processors.9.type: lowercase }
- match: { nodes.$master.ingest.processors.10.type: remove }
- match: { nodes.$master.ingest.processors.11.type: rename }
- match: { nodes.$master.ingest.processors.12.type: set }
- match: { nodes.$master.ingest.processors.13.type: split }
- match: { nodes.$master.ingest.processors.14.type: trim }
- match: { nodes.$master.ingest.processors.15.type: uppercase }

View File

@ -0,0 +1,29 @@
---
"Test date index name processor with defaults":
- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"date_index_name" : {
"field" : "date",
"index_name_prefix": "events-",
"date_rounding": "d"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: events
type: event
id: 1
pipeline: "1"
body: {
date: "2016-04-22T16:32:14.968Z"
}
- match: { _index: "events-2016-04-22"}