diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 8d5dbfe2108..8b6a4478115 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1515,6 +1515,38 @@ Converts a JSON string into a structured JSON object. } -------------------------------------------------- +[[kv-processor]] +=== KV Processor +This processor helps automatically parse messages (or specific event fields) which are of the foo=bar variety. + +For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`, you can parse those automatically by configuring: + + +[source,js] +-------------------------------------------------- +{ + "kv": { + "field": "message", + "field_split": " ", + "value_split": "=" + } +} +-------------------------------------------------- + +[[kv-options]] +.Kv Options +[options="header"] +|====== +| Name | Required | Default | Description +| `field` | yes | - | The field to be parsed +| `field_split` | yes | - | Regex pattern to use for splitting key-value pairs +| `value_split` | yes | - | Regex pattern to use for splitting the key from the value within a key-value pair +| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document +| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys +| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document +|====== + + [[lowercase-processor]] === Lowercase Processor Converts a string to its lowercase equivalent. diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 82d316dfa62..8b6c8e8bed8 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -63,6 +63,7 @@ public class IngestCommonPlugin extends Plugin implements IngestPlugin { processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()); processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory()); + processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java new file mode 100644 index 00000000000..d1f6eb7caf9 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys. + */ +public final class KeyValueProcessor extends AbstractProcessor { + + public static final String TYPE = "kv"; + + private final String field; + private final String fieldSplit; + private final String valueSplit; + private final List includeKeys; + private final String targetField; + private final boolean ignoreMissing; + + KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, List includeKeys, + String targetField, boolean ignoreMissing) { + super(tag); + this.field = field; + this.targetField = targetField; + this.fieldSplit = fieldSplit; + this.valueSplit = valueSplit; + this.includeKeys = includeKeys; + this.ignoreMissing = ignoreMissing; + } + + String getField() { + return field; + } + + String getFieldSplit() { + return fieldSplit; + } + + String getValueSplit() { + return valueSplit; + } + + List getIncludeKeys() { + return includeKeys; + } + + String getTargetField() { + return targetField; + } + + boolean isIgnoreMissing() { + return ignoreMissing; + } + + public void append(IngestDocument document, String targetField, String value) { + if (document.hasField(targetField)) { + document.appendFieldValue(targetField, value); + } else { + document.setFieldValue(targetField, value); + } + } + + @Override + public void execute(IngestDocument document) { + String oldVal = document.getFieldValue(field, String.class, ignoreMissing); + + if (oldVal == null && ignoreMissing) { + return; + } else if (oldVal == null) { + throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs."); + } + + String fieldPathPrefix = (targetField == null) ? "" : targetField + "."; + Arrays.stream(oldVal.split(fieldSplit)) + .map((f) -> f.split(valueSplit, 2)) + .filter((p) -> includeKeys == null || includeKeys.contains(p[0])) + .forEach((p) -> append(document, fieldPathPrefix + p[0], p[1])); + } + + @Override + public String getType() { + return TYPE; + } + + public static class Factory implements Processor.Factory { + @Override + public KeyValueProcessor create(Map registry, String processorTag, + Map config) throws Exception { + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field"); + String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split"); + String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split"); + List includeKeys = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys"); + if (includeKeys != null) { + includeKeys = Collections.unmodifiableList(includeKeys); + } + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, targetField, ignoreMissing); + } + } +} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorFactoryTests.java new file mode 100644 index 00000000000..4dc4e082655 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorFactoryTests.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class KeyValueProcessorFactoryTests extends ESTestCase { + + public void testCreateWithDefaults() throws Exception { + KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("field_split", "&"); + config.put("value_split", "="); + String processorTag = randomAsciiOfLength(10); + KeyValueProcessor processor = factory.create(null, processorTag, config); + assertThat(processor.getTag(), equalTo(processorTag)); + assertThat(processor.getField(), equalTo("field1")); + assertThat(processor.getFieldSplit(), equalTo("&")); + assertThat(processor.getValueSplit(), equalTo("=")); + assertThat(processor.getIncludeKeys(), is(nullValue())); + assertThat(processor.getTargetField(), is(nullValue())); + assertFalse(processor.isIgnoreMissing()); + } + + public void testCreateWithAllFieldsSet() throws Exception { + KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("field_split", "&"); + config.put("value_split", "="); + config.put("target_field", "target"); + config.put("include_keys", Arrays.asList("a", "b")); + config.put("ignore_missing", true); + String processorTag = randomAsciiOfLength(10); + KeyValueProcessor processor = factory.create(null, processorTag, config); + assertThat(processor.getTag(), equalTo(processorTag)); + assertThat(processor.getField(), equalTo("field1")); + assertThat(processor.getFieldSplit(), equalTo("&")); + assertThat(processor.getValueSplit(), equalTo("=")); + assertThat(processor.getIncludeKeys(), equalTo(Arrays.asList("a", "b"))); + assertThat(processor.getTargetField(), equalTo("target")); + assertTrue(processor.isIgnoreMissing()); + } + + public void testCreateWithMissingField() { + KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); + Map config = new HashMap<>(); + String processorTag = randomAsciiOfLength(10); + ElasticsearchException exception = expectThrows(ElasticsearchParseException.class, + () -> factory.create(null, processorTag, config)); + assertThat(exception.getMessage(), equalTo("[field] required property is missing")); + } + + public void testCreateWithMissingFieldSplit() { + KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); + Map config = new HashMap<>(); + config.put("field", "field1"); + String processorTag = randomAsciiOfLength(10); + ElasticsearchException exception = expectThrows(ElasticsearchParseException.class, + () -> factory.create(null, processorTag, config)); + assertThat(exception.getMessage(), equalTo("[field_split] required property is missing")); + } + + public void testCreateWithMissingValueSplit() { + KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("field_split", "&"); + String processorTag = randomAsciiOfLength(10); + ElasticsearchException exception = expectThrows(ElasticsearchParseException.class, + () -> factory.create(null, processorTag, config)); + assertThat(exception.getMessage(), equalTo("[value_split] required property is missing")); + } +} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java new file mode 100644 index 00000000000..2d5f71bf54e --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; +import static org.hamcrest.Matchers.equalTo; + +public class KeyValueProcessorTests extends ESTestCase { + + public void test() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe"); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=", null, "target", false); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); + assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe"))); + } + + public void testRootTarget() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); + ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe"); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "myField", "&", "=", null, null, false); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello")); + assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe"))); + } + + public void testKeySameAsSourceField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); + ingestDocument.setFieldValue("first", "first=hello"); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "first", "&", "=", null, null, false); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello"))); + } + + public void testIncludeKeys() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe"); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=", + Collections.singletonList("first"), "target", false); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); + assertFalse(ingestDocument.hasField("target.second")); + } + + public void testMissingField() { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "&", "=", null, "target", false); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]")); + } + + public void testNullValueWithIgnoreMissing() throws Exception { + String fieldName = RandomDocumentPicks.randomFieldName(random()); + IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), + Collections.singletonMap(fieldName, null)); + IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "", "", null, "target", true); + processor.execute(ingestDocument); + assertIngestDocument(originalIngestDocument, ingestDocument); + } + + public void testNonExistentWithIgnoreMissing() throws Exception { + IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); + IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); + Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "", "", null, "target", true); + processor.execute(ingestDocument); + assertIngestDocument(originalIngestDocument, ingestDocument); + } +} diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml index 87c1f5a8abf..a58c329a7c5 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml @@ -20,12 +20,13 @@ - match: { nodes.$master.ingest.processors.8.type: gsub } - match: { nodes.$master.ingest.processors.9.type: join } - match: { nodes.$master.ingest.processors.10.type: json } - - match: { nodes.$master.ingest.processors.11.type: lowercase } - - match: { nodes.$master.ingest.processors.12.type: remove } - - match: { nodes.$master.ingest.processors.13.type: rename } - - match: { nodes.$master.ingest.processors.14.type: script } - - match: { nodes.$master.ingest.processors.15.type: set } - - match: { nodes.$master.ingest.processors.16.type: sort } - - match: { nodes.$master.ingest.processors.17.type: split } - - match: { nodes.$master.ingest.processors.18.type: trim } - - match: { nodes.$master.ingest.processors.19.type: uppercase } + - match: { nodes.$master.ingest.processors.11.type: kv } + - match: { nodes.$master.ingest.processors.12.type: lowercase } + - match: { nodes.$master.ingest.processors.13.type: remove } + - match: { nodes.$master.ingest.processors.14.type: rename } + - match: { nodes.$master.ingest.processors.15.type: script } + - match: { nodes.$master.ingest.processors.16.type: set } + - match: { nodes.$master.ingest.processors.17.type: sort } + - match: { nodes.$master.ingest.processors.18.type: split } + - match: { nodes.$master.ingest.processors.19.type: trim } + - match: { nodes.$master.ingest.processors.20.type: uppercase } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/150_kv.yaml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/150_kv.yaml new file mode 100644 index 00000000000..a1ecf10278c --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/150_kv.yaml @@ -0,0 +1,43 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "1" + ignore: 404 + +--- +"Test KV Processor": + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "foo", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "1" + body: { + foo: "goodbye=everybody hello=world" + } + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.goodbye: "everybody" } + - match: { _source.hello: "world" }